top of page

LangGraph Agents: Build Stateful, Controllable AI Workflows

  • Writer: Leanware Editorial Team
    Leanware Editorial Team
  • Oct 30
  • 8 min read

LangGraph agents are stateful AI workflows built on graph-based control logic. Unlike traditional chain-based approaches, they give you explicit control over execution flow, state persistence, and error recovery. This matters when you need agents that can pause, resume, loop conditionally, or coordinate multiple tasks without losing context.


In this article, we’ll look at what LangGraph agents are, how they differ from LangChain, and how to design them for reliability, control, and scalability in real-world applications.


What Are LangGraph Agents?


Built with LangGraph

LangChain introduced chains and agents for LLM orchestration. Chains work well for linear workflows, but they struggle with complex control flow. LangChain agents use a loop where the LLM decides what to do next, but this loop is implicit and hard to customize.


Problems with LangChain agents:


  • No way to inspect or modify state mid-execution.

  • Limited control over loop termination.

  • Difficult to add conditional branching.

  • No built-in state persistence between runs.

  • Hard to debug when things go wrong.


LangGraph addresses these by making the execution graph explicit. You define nodes (steps) and edges (transitions) directly. The graph structure shows exactly what can happen and when. This gives you control over loops, branching, and state management that LangChain agents don't provide.


Core Advantages: Control, Persistence, Extensibility

Control: You define the exact flow. Conditional edges let you route based on state. You can limit loop iterations, add breakpoints, or insert human approval steps. The graph makes control flow visible and modifiable.


Persistence: LangGraph includes checkpointing. The system saves state after each node executes. If execution fails, you can resume from the last checkpoint. This enables long-running workflows that survive restarts and allows you to pause agents for human input.


Extensibility: Add nodes without restructuring the entire agent. Insert logging, validation, or external API calls at any point. The graph structure makes it easy to modify workflows without breaking existing logic.


Fundamental Concepts & Architecture


State, Nodes, and Edges

LangGraph models workflows as directed graphs. The state is a typed object that flows through the graph. Nodes are functions that read and update state. Edges connect nodes and define execution order.

from typing import TypedDict
from langgraph.graph import StateGraph

class AgentState(TypedDict):
    messages: list
    next_action: str
    iteration_count: int

Each node receives the current state and returns updates:

def reasoning_node(state: AgentState):
    # Process state
    return {"next_action": "call_tool", "iteration_count": state["iteration_count"] + 1}

Edges specify what happens after each node. Use normal edges for sequential flow and conditional edges for branching.


Control Flow & Conditional Edges

Conditional edges route based on state. After a node executes, a routing function examines state and returns the name of the next node:

def should_continue(state: AgentState):
    if state["iteration_count"] > 5:
        return "end"
    elif state["next_action"] == "call_tool":
        return "tool_node"
    else:
        return "generate"

This replaces implicit LLM-controlled loops with explicit, debuggable logic. You can add iteration limits, error handling, or custom routing conditions without modifying node code.


Memory & Persistence

LangGraph separates short-term and long-term memory. Short-term memory is the state object that flows through the graph. Long-term memory uses checkpointers to save state between runs.


Checkpointers serialize state to storage (memory, SQLite, PostgreSQL):

from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string("agent_state.db")
graph = graph_builder.compile(checkpointer=memory)

Each invocation gets a thread ID. State persists across invocations with the same thread ID:

config = {"configurable": {"thread_id": "user_123"}}
result = graph.invoke(input_data, config=config)

This enables conversation history, multi-turn interactions, and resumable workflows.


Human-in-the-Loop & Moderation

Insert approval steps by adding interruption points. The agent pauses, waits for human input, then resumes:

graph_builder.add_node("approval", approval_node)
graph_builder.add_edge("generate_content", "approval")

# When approval is needed
graph.invoke(input_data, interrupt_before=["approval"])

Resume after approval:

graph.invoke(None, config=config)  # Continues from interruption point

Use this for content moderation, financial approvals, or any workflow requiring human oversight.


Streaming & Real-Time Execution

LangGraph supports streaming at multiple levels. Stream tokens as the LLM generates them:

for chunk in graph.stream(input_data, stream_mode="messages"):
    print(chunk.content, end="")

Stream node updates to show progress:

for event in graph.stream(input_data, stream_mode="updates"):
    print(f"Node {event['node']} completed")

This works with async execution for concurrent operations.


Getting Started: Building a Basic LangGraph Agent

Installation & Setup


Install LangGraph and dependencies:

pip install langgraph langchain langchain-openai

Set your OpenAI API key:

export OPENAI_API_KEY="your-key"

Creating a React/Tool-Calling Agent

Build an agent that reasons about tasks and calls tools. Start with state definition:

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage

class AgentState(TypedDict):
    messages: Annotated[list, "append"]
    tool_calls_made: int

Define tools the agent can use:

from langchain_core.tools import tool

@tool
def search_database(query: str) -> str:
    """Search internal database for information."""
    # Implementation here
    return f"Results for: {query}"

@tool
def calculate(expression: str) -> str:
    """Evaluate mathematical expressions."""
    try:
        result = eval(expression)
        return str(result)
    except:
        return "Invalid expression"

tools = [search_database, calculate]

Create the reasoning node that decides what to do:

from langchain_openai import ChatOpenAI

def agent_node(state: AgentState):
    llm = ChatOpenAI(model="gpt-4o")
    llm_with_tools = llm.bind_tools(tools)
    
    response = llm_with_tools.invoke(state["messages"])
    return {"messages": [response], "tool_calls_made": state.get("tool_calls_made", 0)}

Add a tool execution node:

from langgraph.prebuilt import ToolNode

tool_node = ToolNode(tools)

Wire the graph:

def should_continue(state: AgentState):
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return "end"

workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)

workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")

app = workflow.compile()

Run the agent:

result = app.invoke({
    "messages": [HumanMessage(content="What is 25 * 4?")],
    "tool_calls_made": 0
})

print(result["messages"][-1].content)
# Output: 100

Running Invocation Loops

The agent loops until it decides to stop. The conditional edge after the agent node checks if tool calls are needed. If yes, it routes to tool execution, then back to the agent. If no, it ends.


This is different from LangChain agents where the loop logic is hidden. Here, you control it explicitly. Add iteration limits:

def should_continue(state: AgentState):
    if state["tool_calls_made"] >= 5:
        return "end"  # Prevent infinite loops
    
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return "end"

Error Handling & Checkpointing

Add checkpointing for fault tolerance:

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "task_001"}}
result = app.invoke(input_data, config=config)

If execution fails, resume from the last checkpoint:

result = app.invoke(None, config=config)  # Resumes automatically

Wrap nodes in try-except for graceful error handling:

def safe_agent_node(state: AgentState):
    try:
        return agent_node(state)
    except Exception as e:
        return {"messages": [AIMessage(content=f"Error: {str(e)}")]}

Advanced Agent Patterns & Use Cases


Multi-Agent Systems & Coordination

Build systems where multiple agents work together. Each agent is a separate graph. A coordinator routes tasks between them:

# Agent 1: Research agent
research_graph = build_research_agent()

# Agent 2: Writing agent
writing_graph = build_writing_agent()

# Coordinator
def coordinator_node(state: CoordinatorState):
    task_type = classify_task(state["task"])
    return {"assigned_agent": task_type}

def route_to_agent(state: CoordinatorState):
    return state["assigned_agent"]

coordinator = StateGraph(CoordinatorState)
coordinator.add_node("coordinator", coordinator_node)
coordinator.add_node("research", research_graph)
coordinator.add_node("writing", writing_graph)

coordinator.add_conditional_edges(
    "coordinator",
    route_to_agent,
    {"research": "research", "writing": "writing"}
)

Supervisor & Hierarchical Architectures

A supervisor agent manages sub-agents. The supervisor delegates tasks and aggregates results:

def supervisor_node(state: SupervisorState):
    llm = ChatOpenAI(model="gpt-4o")
    
    prompt = f"Decompose this task into subtasks: {state['task']}"
    response = llm.invoke(prompt)
    
    subtasks = parse_subtasks(response.content)
    return {"subtasks": subtasks, "current_subtask": 0}

def worker_node(state: SupervisorState):
    subtask = state["subtasks"][state["current_subtask"]]
    result = execute_subtask(subtask)
    
    return {
        "results": state["results"] + [result],
        "current_subtask": state["current_subtask"] + 1
    }

def should_continue_supervisor(state: SupervisorState):
    if state["current_subtask"] >= len(state["subtasks"]):
        return "aggregate"
    return "worker"

Swarm/Parallel Agent Patterns

Execute multiple agents in parallel using fan-out/fan-in:

from langgraph.graph import Send

def fan_out(state: SwarmState):
    # Create parallel tasks
    return [
        Send("worker", {"task": task, "worker_id": i})
        for i, task in enumerate(state["tasks"])
    ]

workflow.add_conditional_edges("distribute", fan_out)
workflow.add_node("worker", worker_node)
workflow.add_edge("worker", "aggregate")

All worker nodes execute concurrently. Results collect at the aggregation node.


Embedding External Agents

Wrap external agents (OpenAI Assistants, legacy systems) as LangGraph nodes:

from openai import OpenAI

def openai_assistant_node(state: AgentState):
    client = OpenAI()
    
    thread = client.beta.threads.create()
    client.beta.threads.messages.create(
        thread_id=thread.id,
        role="user",
        content=state["query"]
    )
    
    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id="asst_xxx"
    )
    
    # Poll for completion
    while run.status != "completed":
        run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
    
    messages = client.beta.threads.messages.list(thread_id=thread.id)
    result = messages.data[0].content[0].text.value
    
    return {"response": result}

This lets you combine LangGraph's control flow with external agent capabilities.


Memory & Context Management


Short-Term (Session) Memory

Short-term memory is the state object. Messages accumulate in the state as the agent executes:

class AgentState(TypedDict):
    messages: Annotated[list, "append"]  # Automatically appends new messages

The context window limits how much history you can include. Monitor token counts and truncate old messages when necessary.


Long-Term Memory & Persistent Storage

Store conversation history in a database for retrieval across sessions:

from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vector_store = Chroma(embedding_function=embeddings, persist_directory="./memory")

def retrieve_context(state: AgentState):
    query = state["messages"][-1].content
    relevant_context = vector_store.similarity_search(query, k=3)
    return {"context": relevant_context}

Combine this with checkpointing for full persistence.


Memory Pruning & Summarization

Compress conversation history when it exceeds limits:

def summarize_history(state: AgentState):
    if len(state["messages"]) > 20:
        llm = ChatOpenAI(model="gpt-4o-mini")
        
        old_messages = state["messages"][:-10]
        summary_prompt = f"Summarize this conversation: {old_messages}"
        summary = llm.invoke(summary_prompt)
        
        # Keep summary + recent messages
        return {"messages": [summary] + state["messages"][-10:]}
    return {}

Best Practices for Production Deployments


Scalability & Fault Tolerance

Run agents in containers with horizontal scaling:

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "agent.py"]

Use persistent checkpointers (PostgreSQL, Redis) instead of in-memory:

from langgraph.checkpoint.postgres import PostgresSaver

checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)

Deploy multiple instances behind a load balancer. Each instance can handle different thread IDs independently.


Observability & Logging/Tracing

Enable LangSmith for execution tracing:

export LANGSMITH_TRACING_V2=true
export LANGSMITH_API_KEY="your-key"

LangSmith captures every node execution, LLM call, and state transition. This makes debugging production issues significantly easier.


Add custom logging:

import logging

def logged_node(state: AgentState):
    logging.info(f"Executing node with state: {state}")
    result = process(state)
    logging.info(f"Node result: {result}")
    return result

Human Oversight & Safeguards

Add content filters before sensitive operations:

def content_filter(state: AgentState):
    response = state["messages"][-1].content
    
    if contains_sensitive_content(response):
        return {"approved": False, "reason": "Contains sensitive content"}
    return {"approved": True}

workflow.add_node("filter", content_filter)
workflow.add_conditional_edges(
    "filter",
    lambda s: "approve" if s["approved"] else "reject",
    {"approve": "send", "reject": "human_review"}
)

Set confidence thresholds for agent decisions:

def check_confidence(state: AgentState):
    if state["confidence"] < 0.8:
        return "human_review"
    return "execute"

Latency & Performance Considerations

Cache LLM responses for repeated queries:

from langchain.cache import SQLiteCache
from langchain.globals import set_llm_cache

set_llm_cache(SQLiteCache(database_path=".langchain.db"))

Batch tool calls when possible:

def batch_tool_calls(state: AgentState):
    tool_calls = state["pending_tool_calls"]
    results = execute_tools_parallel(tool_calls)
    return {"tool_results": results}

Use cheaper models for routing and expensive models only for final generation:

router_llm = ChatOpenAI(model="gpt-4o-mini")
generator_llm = ChatOpenAI(model="gpt-4o")

Your Next Move

Start with a basic tool-calling agent to understand the graph structure. Add checkpointing once you need persistence. Introduce conditional routing when your workflow requires branching logic.


For production systems, prioritize observability and error handling from the start. Use LangSmith for tracing and PostgreSQL for checkpointing. Add human-in-the-loop approval for high-stakes decisions.


The graph structure scales as your requirements grow. You can add nodes for validation, insert memory layers, or coordinate multiple agents without restructuring existing logic.


You can also connect with our AI engineering team to design, build, or scale LangGraph-based AI systems for production.


Frequently Asked Questions

What are LangGraph Agents?

LangGraph agents are AI workflows built on explicit graph structures. They use nodes (processing steps) and edges (transitions) to control execution flow. This differs from chain-based approaches by making control logic visible and modifiable.

How is LangGraph different from LangChain?

LangGraph extends LangChain with stateful, graph-based control flow. LangChain uses chains and implicit agent loops. LangGraph gives you explicit control over state, branching, loops, and persistence. You define the exact execution path rather than relying on the LLM to control flow.

Does LangGraph support multi-agent systems?

Yes. Build multiple agents as separate graphs and coordinate them using supervisor patterns, message passing, or parallel execution. The graph structure makes it straightforward to route between agents and aggregate results.

Can I use LangGraph with OpenAI tools?

Yes. LangGraph works with OpenAI's function calling and tool use features. You can also wrap OpenAI Assistants API as nodes in LangGraph workflows, combining OpenAI's capabilities with LangGraph's control flow.

Is LangGraph production-ready?

Yes. LangGraph includes checkpointing for fault tolerance, streaming for real-time responses, and observability through LangSmith. Teams use it in production for customer support, document processing, and automated workflows. The explicit graph structure makes it easier to test and debug than implicit agent loops.


Join our newsletter for fresh insights, once a month. No spam.

 
 
bottom of page