LangGraph Agents: Build Stateful, Controllable AI Workflows
- Leanware Editorial Team

- Oct 30
- 8 min read
LangGraph agents are stateful AI workflows built on graph-based control logic. Unlike traditional chain-based approaches, they give you explicit control over execution flow, state persistence, and error recovery. This matters when you need agents that can pause, resume, loop conditionally, or coordinate multiple tasks without losing context.
In this article, we’ll look at what LangGraph agents are, how they differ from LangChain, and how to design them for reliability, control, and scalability in real-world applications.
What Are LangGraph Agents?

LangChain introduced chains and agents for LLM orchestration. Chains work well for linear workflows, but they struggle with complex control flow. LangChain agents use a loop where the LLM decides what to do next, but this loop is implicit and hard to customize.
Problems with LangChain agents:
No way to inspect or modify state mid-execution.
Limited control over loop termination.
Difficult to add conditional branching.
No built-in state persistence between runs.
Hard to debug when things go wrong.
LangGraph addresses these by making the execution graph explicit. You define nodes (steps) and edges (transitions) directly. The graph structure shows exactly what can happen and when. This gives you control over loops, branching, and state management that LangChain agents don't provide.
Core Advantages: Control, Persistence, Extensibility
Control: You define the exact flow. Conditional edges let you route based on state. You can limit loop iterations, add breakpoints, or insert human approval steps. The graph makes control flow visible and modifiable.
Persistence: LangGraph includes checkpointing. The system saves state after each node executes. If execution fails, you can resume from the last checkpoint. This enables long-running workflows that survive restarts and allows you to pause agents for human input.
Extensibility: Add nodes without restructuring the entire agent. Insert logging, validation, or external API calls at any point. The graph structure makes it easy to modify workflows without breaking existing logic.
Fundamental Concepts & Architecture
State, Nodes, and Edges
LangGraph models workflows as directed graphs. The state is a typed object that flows through the graph. Nodes are functions that read and update state. Edges connect nodes and define execution order.
from typing import TypedDict
from langgraph.graph import StateGraph
class AgentState(TypedDict):
messages: list
next_action: str
iteration_count: intEach node receives the current state and returns updates:
def reasoning_node(state: AgentState):
# Process state
return {"next_action": "call_tool", "iteration_count": state["iteration_count"] + 1}Edges specify what happens after each node. Use normal edges for sequential flow and conditional edges for branching.
Control Flow & Conditional Edges
Conditional edges route based on state. After a node executes, a routing function examines state and returns the name of the next node:
def should_continue(state: AgentState):
if state["iteration_count"] > 5:
return "end"
elif state["next_action"] == "call_tool":
return "tool_node"
else:
return "generate"This replaces implicit LLM-controlled loops with explicit, debuggable logic. You can add iteration limits, error handling, or custom routing conditions without modifying node code.
Memory & Persistence
LangGraph separates short-term and long-term memory. Short-term memory is the state object that flows through the graph. Long-term memory uses checkpointers to save state between runs.
Checkpointers serialize state to storage (memory, SQLite, PostgreSQL):
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string("agent_state.db")
graph = graph_builder.compile(checkpointer=memory)Each invocation gets a thread ID. State persists across invocations with the same thread ID:
config = {"configurable": {"thread_id": "user_123"}}
result = graph.invoke(input_data, config=config)This enables conversation history, multi-turn interactions, and resumable workflows.
Human-in-the-Loop & Moderation
Insert approval steps by adding interruption points. The agent pauses, waits for human input, then resumes:
graph_builder.add_node("approval", approval_node)
graph_builder.add_edge("generate_content", "approval")
# When approval is needed
graph.invoke(input_data, interrupt_before=["approval"])Resume after approval:
graph.invoke(None, config=config) # Continues from interruption pointUse this for content moderation, financial approvals, or any workflow requiring human oversight.
Streaming & Real-Time Execution
LangGraph supports streaming at multiple levels. Stream tokens as the LLM generates them:
for chunk in graph.stream(input_data, stream_mode="messages"):
print(chunk.content, end="")Stream node updates to show progress:
for event in graph.stream(input_data, stream_mode="updates"):
print(f"Node {event['node']} completed")This works with async execution for concurrent operations.
Getting Started: Building a Basic LangGraph Agent
Installation & Setup
Install LangGraph and dependencies:
pip install langgraph langchain langchain-openaiSet your OpenAI API key:
export OPENAI_API_KEY="your-key"Creating a React/Tool-Calling Agent
Build an agent that reasons about tasks and calls tools. Start with state definition:
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage
class AgentState(TypedDict):
messages: Annotated[list, "append"]
tool_calls_made: intDefine tools the agent can use:
from langchain_core.tools import tool
@tool
def search_database(query: str) -> str:
"""Search internal database for information."""
# Implementation here
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Evaluate mathematical expressions."""
try:
result = eval(expression)
return str(result)
except:
return "Invalid expression"
tools = [search_database, calculate]Create the reasoning node that decides what to do:
from langchain_openai import ChatOpenAI
def agent_node(state: AgentState):
llm = ChatOpenAI(model="gpt-4o")
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(state["messages"])
return {"messages": [response], "tool_calls_made": state.get("tool_calls_made", 0)}Add a tool execution node:
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools)Wire the graph:
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return "end"
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent_node)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
workflow.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
workflow.add_edge("tools", "agent")
app = workflow.compile()Run the agent:
result = app.invoke({
"messages": [HumanMessage(content="What is 25 * 4?")],
"tool_calls_made": 0
})
print(result["messages"][-1].content)
# Output: 100Running Invocation Loops
The agent loops until it decides to stop. The conditional edge after the agent node checks if tool calls are needed. If yes, it routes to tool execution, then back to the agent. If no, it ends.
This is different from LangChain agents where the loop logic is hidden. Here, you control it explicitly. Add iteration limits:
def should_continue(state: AgentState):
if state["tool_calls_made"] >= 5:
return "end" # Prevent infinite loops
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return "end"Error Handling & Checkpointing
Add checkpointing for fault tolerance:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "task_001"}}
result = app.invoke(input_data, config=config)If execution fails, resume from the last checkpoint:
result = app.invoke(None, config=config) # Resumes automaticallyWrap nodes in try-except for graceful error handling:
def safe_agent_node(state: AgentState):
try:
return agent_node(state)
except Exception as e:
return {"messages": [AIMessage(content=f"Error: {str(e)}")]}Advanced Agent Patterns & Use Cases
Multi-Agent Systems & Coordination
Build systems where multiple agents work together. Each agent is a separate graph. A coordinator routes tasks between them:
# Agent 1: Research agent
research_graph = build_research_agent()
# Agent 2: Writing agent
writing_graph = build_writing_agent()
# Coordinator
def coordinator_node(state: CoordinatorState):
task_type = classify_task(state["task"])
return {"assigned_agent": task_type}
def route_to_agent(state: CoordinatorState):
return state["assigned_agent"]
coordinator = StateGraph(CoordinatorState)
coordinator.add_node("coordinator", coordinator_node)
coordinator.add_node("research", research_graph)
coordinator.add_node("writing", writing_graph)
coordinator.add_conditional_edges(
"coordinator",
route_to_agent,
{"research": "research", "writing": "writing"}
)Supervisor & Hierarchical Architectures
A supervisor agent manages sub-agents. The supervisor delegates tasks and aggregates results:
def supervisor_node(state: SupervisorState):
llm = ChatOpenAI(model="gpt-4o")
prompt = f"Decompose this task into subtasks: {state['task']}"
response = llm.invoke(prompt)
subtasks = parse_subtasks(response.content)
return {"subtasks": subtasks, "current_subtask": 0}
def worker_node(state: SupervisorState):
subtask = state["subtasks"][state["current_subtask"]]
result = execute_subtask(subtask)
return {
"results": state["results"] + [result],
"current_subtask": state["current_subtask"] + 1
}
def should_continue_supervisor(state: SupervisorState):
if state["current_subtask"] >= len(state["subtasks"]):
return "aggregate"
return "worker"Swarm/Parallel Agent Patterns
Execute multiple agents in parallel using fan-out/fan-in:
from langgraph.graph import Send
def fan_out(state: SwarmState):
# Create parallel tasks
return [
Send("worker", {"task": task, "worker_id": i})
for i, task in enumerate(state["tasks"])
]
workflow.add_conditional_edges("distribute", fan_out)
workflow.add_node("worker", worker_node)
workflow.add_edge("worker", "aggregate")All worker nodes execute concurrently. Results collect at the aggregation node.
Embedding External Agents
Wrap external agents (OpenAI Assistants, legacy systems) as LangGraph nodes:
from openai import OpenAI
def openai_assistant_node(state: AgentState):
client = OpenAI()
thread = client.beta.threads.create()
client.beta.threads.messages.create(
thread_id=thread.id,
role="user",
content=state["query"]
)
run = client.beta.threads.runs.create(
thread_id=thread.id,
assistant_id="asst_xxx"
)
# Poll for completion
while run.status != "completed":
run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
messages = client.beta.threads.messages.list(thread_id=thread.id)
result = messages.data[0].content[0].text.value
return {"response": result}This lets you combine LangGraph's control flow with external agent capabilities.
Memory & Context Management
Short-Term (Session) Memory
Short-term memory is the state object. Messages accumulate in the state as the agent executes:
class AgentState(TypedDict):
messages: Annotated[list, "append"] # Automatically appends new messagesThe context window limits how much history you can include. Monitor token counts and truncate old messages when necessary.
Long-Term Memory & Persistent Storage
Store conversation history in a database for retrieval across sessions:
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
vector_store = Chroma(embedding_function=embeddings, persist_directory="./memory")
def retrieve_context(state: AgentState):
query = state["messages"][-1].content
relevant_context = vector_store.similarity_search(query, k=3)
return {"context": relevant_context}Combine this with checkpointing for full persistence.
Memory Pruning & Summarization
Compress conversation history when it exceeds limits:
def summarize_history(state: AgentState):
if len(state["messages"]) > 20:
llm = ChatOpenAI(model="gpt-4o-mini")
old_messages = state["messages"][:-10]
summary_prompt = f"Summarize this conversation: {old_messages}"
summary = llm.invoke(summary_prompt)
# Keep summary + recent messages
return {"messages": [summary] + state["messages"][-10:]}
return {}Best Practices for Production Deployments
Scalability & Fault Tolerance
Run agents in containers with horizontal scaling:
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "agent.py"]Use persistent checkpointers (PostgreSQL, Redis) instead of in-memory:
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)Deploy multiple instances behind a load balancer. Each instance can handle different thread IDs independently.
Observability & Logging/Tracing
Enable LangSmith for execution tracing:
export LANGSMITH_TRACING_V2=true
export LANGSMITH_API_KEY="your-key"LangSmith captures every node execution, LLM call, and state transition. This makes debugging production issues significantly easier.
Add custom logging:
import logging
def logged_node(state: AgentState):
logging.info(f"Executing node with state: {state}")
result = process(state)
logging.info(f"Node result: {result}")
return resultHuman Oversight & Safeguards
Add content filters before sensitive operations:
def content_filter(state: AgentState):
response = state["messages"][-1].content
if contains_sensitive_content(response):
return {"approved": False, "reason": "Contains sensitive content"}
return {"approved": True}
workflow.add_node("filter", content_filter)
workflow.add_conditional_edges(
"filter",
lambda s: "approve" if s["approved"] else "reject",
{"approve": "send", "reject": "human_review"}
)Set confidence thresholds for agent decisions:
def check_confidence(state: AgentState):
if state["confidence"] < 0.8:
return "human_review"
return "execute"Latency & Performance Considerations
Cache LLM responses for repeated queries:
from langchain.cache import SQLiteCache
from langchain.globals import set_llm_cache
set_llm_cache(SQLiteCache(database_path=".langchain.db"))Batch tool calls when possible:
def batch_tool_calls(state: AgentState):
tool_calls = state["pending_tool_calls"]
results = execute_tools_parallel(tool_calls)
return {"tool_results": results}Use cheaper models for routing and expensive models only for final generation:
router_llm = ChatOpenAI(model="gpt-4o-mini")
generator_llm = ChatOpenAI(model="gpt-4o")Your Next Move
Start with a basic tool-calling agent to understand the graph structure. Add checkpointing once you need persistence. Introduce conditional routing when your workflow requires branching logic.
For production systems, prioritize observability and error handling from the start. Use LangSmith for tracing and PostgreSQL for checkpointing. Add human-in-the-loop approval for high-stakes decisions.
The graph structure scales as your requirements grow. You can add nodes for validation, insert memory layers, or coordinate multiple agents without restructuring existing logic.
You can also connect with our AI engineering team to design, build, or scale LangGraph-based AI systems for production.
Frequently Asked Questions
What are LangGraph Agents?
LangGraph agents are AI workflows built on explicit graph structures. They use nodes (processing steps) and edges (transitions) to control execution flow. This differs from chain-based approaches by making control logic visible and modifiable.
How is LangGraph different from LangChain?
LangGraph extends LangChain with stateful, graph-based control flow. LangChain uses chains and implicit agent loops. LangGraph gives you explicit control over state, branching, loops, and persistence. You define the exact execution path rather than relying on the LLM to control flow.
Does LangGraph support multi-agent systems?
Yes. Build multiple agents as separate graphs and coordinate them using supervisor patterns, message passing, or parallel execution. The graph structure makes it straightforward to route between agents and aggregate results.
Can I use LangGraph with OpenAI tools?
Yes. LangGraph works with OpenAI's function calling and tool use features. You can also wrap OpenAI Assistants API as nodes in LangGraph workflows, combining OpenAI's capabilities with LangGraph's control flow.
Is LangGraph production-ready?
Yes. LangGraph includes checkpointing for fault tolerance, streaming for real-time responses, and observability through LangSmith. Teams use it in production for customer support, document processing, and automated workflows. The explicit graph structure makes it easier to test and debug than implicit agent loops.




