top of page

LangGraph RAG: How to Build Agentic Retrieval-Augmented Generation

  • Writer: Leanware Editorial Team
    Leanware Editorial Team
  • 6 hours ago
  • 12 min read

Most RAG workflows are linear: retrieve documents, feed them to an LLM, and generate an answer. That works until you need retries, conditional logic, or different paths based on context.


LangGraph treats RAG as a graph of nodes and edges. Each node handles a task - retrieval, grading, or generation and edges control flow based on results. That makes it easier to adapt and retry parts of the workflow without rewriting the whole pipeline.


In this guide, we’ll build a practical RAG system with LangGraph, walk through the core concepts, implement the workflow step by step, and explore patterns like adaptive routing and self‑correction. 


This is for anyone who has worked with LangChain or basic RAG pipelines and wants more control over how the workflow executes.


Why Use LangGraph for RAG?


Standard RAG implementations use LangChain to chain components together: load documents, split them into chunks, embed them, store vectors, retrieve similar chunks, and pass them to an LLM. This pipeline is deterministic and easy to understand, but it has no flexibility.


If your retriever returns irrelevant documents, the chain continues anyway. If the LLM generates a response that contradicts the retrieved context, you can't catch it mid-stream. If different query types need different retrieval strategies, you're building multiple separate chains.


Chains also lack visibility. When something breaks, you debug by adding print statements or hoping your logging captures the right information. There's no structured way to inspect state between steps or branch based on intermediate results.


Benefits of Graph-Based Orchestration

LangGraph represents your workflow as a directed graph. Nodes are functions that transform state. Edges connect nodes, and conditional edges let you route based on logic you define.


This structure gives you advantages like:


  • Conditional routing: Route queries to different retrievers based on document type or user intent

  • Error recovery: Retry retrieval with modified queries if initial results score poorly

  • State inspection: Access and modify the graph state at any point

  • Reusable nodes: Define retrieval or grading logic once and reuse it across different flows

  • Visual debugging: LangGraph can export your workflow as a diagram, making logic explicit


LangGraph integrates with LangChain components, so you can reuse existing retrievers, document loaders, and embeddings. It also works with LangSmith for tracing and debugging production systems.


Key RAG Architectures

LangGraph natively supports three RAG patterns:


Adaptive RAG routes queries dynamically. A simple question might skip retrieval and use the LLM's parametric knowledge. A complex question might trigger vector search. A time-sensitive query could route to web search instead of your vector store.


Corrective RAG evaluates retrieved documents before generation. If documents score poorly for relevance, the system can rewrite the query and retry retrieval, or fall back to web search.


Self-Reflective RAG generates an answer, then evaluates it for hallucinations or factual errors. If the answer fails validation, the system can regenerate with additional context or constraints.

These patterns aren't exclusive. You can combine them in a single graph to handle different scenarios.


What You'll Build


Adaptive RAG System

The example system is a technical documentation Q&A tool. It retrieves API and system information, grades documents for relevance, and adapts when retrieval fails. If initial results are poor, the system automatically rewrites queries.


The workflow:


  1. Receive query: A question is submitted.

  2. Retrieve documents: The vector store is queried for relevant chunks.

  3. Grade relevance: Retrieved documents are scored against the query.

  4. Conditional routing: If documents are relevant, an answer is generated. If not, the query is rewritten and retrieval is retried.

  5. Generate response: Filtered documents are passed to the LLM.

  6. Return answer: The final response is streamed or returned.


The graph structure makes each step explicit. State can be inspected after retrieval, queries can be adjusted before regeneration, and additional validation nodes can be inserted without restructuring the entire pipeline.


Prerequisites & Setup

You need Python 3.10 or later. Install core dependencies:

pip install langgraph langchain langchain-openai

Set your OpenAI API key:

export OPENAI_API_KEY="your-key-here"

Store this in a .env file and load it with python-dotenv for local development.


Choosing a Vector Store

FAISS: Free, local, fast for prototyping. No persistence by default. Use this for development.


Pinecone: Managed, serverless, handles scale. Has a free tier. Use this for production if you want low operational overhead.


Weaviate: Self-hosted or cloud, supports hybrid search. Use this if you need more control over infrastructure.


Qdrant: Similar to Weaviate, strong performance, good Docker support. Use this if you're deploying on Kubernetes.


This tutorial uses InMemoryVectorStore to keep things simple. You can swap it for any LangChain-supported vector store without changing the graph logic.


Core Concepts: Graph, State & Nodes


Defining the State

LangGraph uses a typed state object to pass data between nodes. For RAG, your state tracks the query, retrieved documents, relevance scores, and the final answer.

from typing import TypedDict, List
from langchain_core.documents import Document

class GraphState(TypedDict):
    question: str
    documents: List[Document]
    generation: str
    retry_count: int

Nodes read from state and return updates. LangGraph merges these updates automatically.


Designing Nodes & Tools

Each node is a Python function that takes state as input and returns a dictionary of updates. For example, a retrieval node queries your vector store and adds documents to state:

def retrieve(state: GraphState):
    question = state["question"]
    documents = retriever.invoke(question)
    return {"documents": documents, "retry_count": 0}

You wrap external tools (retrievers, LLMs, APIs) inside nodes. This keeps the graph logic clean and makes nodes reusable.


Conditional Edges & Control Flow

Edges connect nodes. Conditional edges route based on state. For example, after grading documents, you might route to generation if documents are relevant, or to query rewriting if they're not:

def decide_next_step(state: GraphState):
    if len(state["documents"]) > 0:
        return "generate"
    elif state["retry_count"] < 1:
        return "rewrite"
    else:
        return "generate"

This function returns a string that maps to a node name. LangGraph uses it to determine the next step.


Implementation Walkthrough

We'll build a system that answers questions about API endpoints and system configuration. The system grades retrieved documents and rewrites queries when initial retrieval fails.


Imports & Setup

from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.documents import Document
from langgraph.graph import StateGraph, START, END
from typing_extensions import List, TypedDict

Sample Documentation

We'll use technical documentation about an API system:

# Sample technical documentation
docs_content = [
    {
        "content": """REST API Authentication
        
The API uses JWT (JSON Web Tokens) for authentication. To authenticate:

1. Send POST request to /api/v1/auth/login with credentials
2. Receive JWT token in response
3. Include token in Authorization header: Bearer <token>
4. Tokens expire after 24 hours

Example:
POST /api/v1/auth/login
{"username": "user", "password": "pass"}

Response:
{"token": "eyJhbGc...", "expires_in": 86400}""",
        "metadata": {"source": "auth_guide", "section": "authentication"}
    },
    {
        "content": """Rate Limiting

API requests are rate limited to prevent abuse:

- 100 requests per minute per API key
- 1000 requests per hour per API key
- Rate limit headers included in responses:
  X-RateLimit-Limit: Maximum requests allowed
  X-RateLimit-Remaining: Requests remaining
  X-RateLimit-Reset: Time when limit resets

Exceeding rate limits returns HTTP 429 (Too Many Requests).""",
        "metadata": {"source": "rate_limits", "section": "limits"}
    },
    {
        "content": """WebSocket Connections

Real-time data streaming via WebSocket:

- Endpoint: wss://api.example.com/v1/stream
- Requires authentication via query parameter: ?token=<jwt>
- Sends JSON messages with event types
- Automatic reconnection on disconnect
- Ping/pong heartbeat every 30 seconds

Connection example:
const ws = new WebSocket('wss://api.example.com/v1/stream?token=<jwt>');""",
        "metadata": {"source": "websocket_guide", "section": "realtime"}
    },
    {
        "content": """Error Handling

Standard HTTP error codes:

400 Bad Request - Invalid request format or parameters
401 Unauthorized - Missing or invalid authentication
403 Forbidden - Valid auth but insufficient permissions
404 Not Found - Endpoint or resource doesn't exist
429 Too Many Requests - Rate limit exceeded
500 Internal Server Error - Server-side error
503 Service Unavailable - Service temporarily down

Error responses include:
{"error": "error_code", "message": "description", "details": {}}""",
        "metadata": {"source": "error_guide", "section": "errors"}
    },
    {
        "content": """Data Models

User object structure:
{
  "id": "string (UUID)",
  "username": "string",
  "email": "string",
  "created_at": "ISO 8601 timestamp",
  "roles": ["string array"],
  "status": "active|suspended|deleted"
}

Project object structure:
{
  "id": "string (UUID)",
  "name": "string",
  "owner_id": "string (user UUID)",
  "created_at": "ISO 8601 timestamp",
  "settings": {}
}""",
        "metadata": {"source": "data_models", "section": "schemas"}
    },
    {
        "content": """API Endpoints Reference

GET /api/v1/users - List all users (paginated)
GET /api/v1/users/{id} - Get specific user
POST /api/v1/users - Create new user
PUT /api/v1/users/{id} - Update user
DELETE /api/v1/users/{id} - Delete user

GET /api/v1/projects - List projects
GET /api/v1/projects/{id} - Get project details
POST /api/v1/projects - Create project
PUT /api/v1/projects/{id} - Update project

All endpoints require authentication except /auth/login.""",
        "metadata": {"source": "endpoints", "section": "reference"}
    }
]

# Convert to Document objects
documents = [
    Document(page_content=doc["content"], metadata=doc["metadata"])
    for doc in docs_content
]

Indexing & Retriever Creation

# Create embeddings and vector store
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
vectorstore = InMemoryVectorStore(embeddings)
_ = vectorstore.add_documents(documents=documents)

# Create retriever
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

print(f"Indexed {len(documents)} documents")
# Output: Indexed 6 documents

State Definition

class GraphState(TypedDict):
    question: str
    documents: List[Document]
    generation: str
    retry_count: int

Node Definitions

Retrieval node queries the vector store:

def retrieve_node(state: GraphState):
    """Retrieve documents from vector store."""
    question = state["question"]
    documents = retriever.invoke(question)
    return {"documents": documents, "retry_count": 0}

Grading node evaluates document relevance:

def grade_documents(state: GraphState):
    """Score documents for relevance to the question."""
    question = state["question"]
    documents = state["documents"]
    
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    filtered_docs = []
    for doc in documents:
        prompt = f"""Is this document relevant to answering the question?
        
Question: {question}

Document: {doc.page_content[:300]}...

Answer only 'yes' or 'no'."""
        
        response = llm.invoke(prompt)
        if "yes" in response.content.lower():
            filtered_docs.append(doc)
    
    print(f"Graded {len(documents)} documents, kept {len(filtered_docs)}")
    return {"documents": filtered_docs}

Query rewriting node reformulates questions when retrieval fails:

def rewrite_query(state: GraphState):
    """Rewrite the query to improve retrieval."""
    question = state["question"]
    
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    prompt = f"""Rewrite this question to be more specific and include technical keywords:

Original: {question}

Rewritten:"""
    
    response = llm.invoke(prompt)
    rewritten = response.content.strip()
    print(f"Rewrote: '{question}' → '{rewritten}'")
    
    # Retrieve with new query
    documents = retriever.invoke(rewritten)
    return {
        "documents": documents,
        "question": rewritten,
        "retry_count": state["retry_count"] + 1
    }

Generation node produces the final answer:

def generate_node(state: GraphState):
    """Generate answer using retrieved context."""
    question = state["question"]
    documents = state["documents"]
    
    if not documents:
        return {"generation": "I don't have enough information to answer this question."}
    
    context = "\n\n".join([
        f"[Source: {doc.metadata.get('source', 'unknown')}]\n{doc.page_content}" 
        for doc in documents
    ])
    
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    prompt = f"""Answer the question using only the provided context. Be specific and cite sources.

Context:
{context}

Question: {question}

Answer:"""
    
    response = llm.invoke(prompt)
    return {"generation": response.content}

Conditional Routing Logic

def decide_next_step(state: GraphState):
    """Route to generation or query rewriting based on document quality."""
    if len(state["documents"]) > 0:
        return "generate"
    elif state["retry_count"] < 1:
        return "rewrite"
    else:
        return "generate"

Graph Construction

# Build graph
workflow = StateGraph(GraphState)

# Add nodes
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("grade", grade_documents)
workflow.add_node("rewrite", rewrite_query)
workflow.add_node("generate", generate_node)

# Define flow
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "grade")
workflow.add_conditional_edges(
    "grade",
    decide_next_step,
    {
        "generate": "generate",
        "rewrite": "rewrite"
    }
)
workflow.add_edge("rewrite", "grade")
workflow.add_edge("generate", END)

# Compile
app = workflow.compile()

Running the Graph

Test with a clear question:

result = app.invoke({
    "question": "How do I authenticate with the API?",
    "documents": [],
    "generation": "",
    "retry_count": 0
})

print(f"\nAnswer: {result['generation']}")
```

**Output:**
```
Graded 3 documents, kept 2

Answer: To authenticate with the API, send a POST request to /api/v1/auth/login with your credentials (username and password). You'll receive a JWT token in the response that's valid for 24 hours. Include this token in the Authorization header as "Bearer <token>" for all subsequent API requests. [Source: auth_guide]

Test with a vague question:

result = app.invoke({
    "question": "What about limits?",
    "documents": [],
    "generation": "",
    "retry_count": 0
})
```

**Output:**
```
Graded 3 documents, kept 1
Rewrote: 'What about limits?' → 'What are the API rate limits and request quotas?'
Graded 3 documents, kept 2

Answer: The API enforces rate limits of 100 requests per minute and 1000 requests per hour per API key. Rate limit information is included in response headers (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset). Exceeding these limits returns HTTP 429 (Too Many Requests). [Source: rate_limits]

The graph detected poor initial retrieval (only 1 relevant document), rewrote the query to be more specific, retrieved better documents, and generated a complete answer.


Comparison: Chain vs Graph

A traditional chain with the vague query "What about limits?":

# Traditional RAG (no adaptation)
docs = retriever.invoke("What about limits?")
context = "\n\n".join([d.page_content for d in docs])
llm = ChatOpenAI(model="gpt-4o", temperature=0)
answer = llm.invoke(f"Context: {context}\n\nQuestion: What about limits?")

print(answer.content)
# Output: Vague answer mixing rate limits with other unrelated content

The graph approach:


  1. Retrieved 3 documents

  2. Grading kept only 1 relevant document

  3. Detected insufficient context

  4. Rewrote query to be more specific

  5. Retrieved 3 new documents

  6. Kept 2 relevant documents

  7. Generated accurate, focused answer


This adaptive behavior separates graph-based RAG from linear chains.


Debugging & Visualization

Visualize the graph structure:

from IPython.display import Image, display

display(Image(app.get_graph().draw_mermaid_png()))

Stream execution to observe each step:

for event in app.stream({
    "question": "How do WebSocket connections work?",
    "documents": [],
    "generation": "",
    "retry_count": 0
}):
    print(event)

Enable LangSmith tracing:

import os
os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = "your-key"

LangSmith captures the full execution trace, including LLM calls, retrieval results, and routing decisions.


Enhancements & Advanced Patterns


Adaptive RAG: Dynamic Routing

Add a routing node at the start to decide between vector search and direct LLM response:

def route_question(state: GraphState):
    question = state["question"]
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    prompt = f"""Does this question require looking up specific documentation, or can it be answered with general knowledge?

Question: {question}

Answer 'documentation' or 'general' only."""
    
    response = llm.invoke(prompt)
    
    if "general" in response.content.lower():
        return "direct_answer"
    return "retrieve"

This prevents unnecessary retrieval for simple questions like "What is REST?" while routing technical questions through the full pipeline.


Corrective & Self-Reflective RAG

Add validation after generation:

def validate_answer(state: GraphState):
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    
    docs_text = "\n".join([doc.page_content for doc in state["documents"]])
    
    prompt = f"""Does this answer contain information not supported by the context?

Context: {docs_text[:1000]}...

Answer: {state['generation']}

Respond 'yes' or 'no' only."""
    
    response = llm.invoke(prompt)
    
    if "yes" in response.content.lower():
        return {"generation": "I cannot answer this based on the available documentation."}
    return {}

Fallback to Web Search

Add a web search node for questions outside your documentation:

from langchain_community.tools.tavily_search import TavilySearchResults

def web_search_node(state: GraphState):
    """Search the web when internal docs are insufficient."""
    search = TavilySearchResults(max_results=3)
    results = search.invoke(state["question"])
    
    docs = [
        Document(page_content=r["content"], metadata={"source": r["url"]})
        for r in results
    ]
    return {"documents": docs}

Route to this node when grading fails twice in a row.


Memory & State Persistence

Save conversation state across sessions:

from langgraph.checkpoint.sqlite import SqliteSaver

memory = SqliteSaver.from_conn_string("checkpoints.db")
app = workflow.compile(checkpointer=memory)

# Use with thread_id
config = {"configurable": {"thread_id": "user_123"}}
result = app.invoke(initial_state, config=config)

This maintains context for multi-turn conversations.


Evaluation & Observability


Metrics & Quality Checks

Track retrieval precision:

def evaluate_retrieval(question, retrieved_docs, ground_truth_docs):
    retrieved_ids = {doc.metadata.get("source") for doc in retrieved_docs}
    ground_truth_ids = {doc.metadata.get("source") for doc in ground_truth_docs}
    
    if not retrieved_ids:
        return {"precision": 0, "recall": 0}
    
    precision = len(retrieved_ids & ground_truth_ids) / len(retrieved_ids)
    recall = len(retrieved_ids & ground_truth_ids) / len(ground_truth_ids)
    
    return {"precision": precision, "recall": recall}

Grade generation quality:

def evaluate_generation(question, answer, context):
    llm = ChatOpenAI(model="gpt-4o", temperature=0)
    
    prompt = f"""Rate this answer (1-5) for accuracy, completeness, and conciseness.

Question: {question}
Context: {context[:500]}...
Answer: {answer}

Provide scores as: accuracy=X, completeness=Y, conciseness=Z"""
    
    response = llm.invoke(prompt)
    return response.content

Tracing with LangSmith

Enable tracing:

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY="your-key"

LangSmith visualizes graph execution, showing exact prompts, retrieved documents, and routing decisions in a web UI.


Troubleshooting & Common Pitfalls


Incorrect Routing & Loops

Prevent infinite loops with explicit exit conditions:

def decide_next_step(state: GraphState):
    if state["retry_count"] >= 2:
        return "generate"  # Force exit
    elif len(state["documents"]) > 0:
        return "generate"
    else:
        return "rewrite"

Poor Retrieval & Irrelevant Results

If retrieval consistently fails, adjust your chunking strategy. For technical documentation, preserve complete sections:

from langchain_text_splitters import RecursiveCharacterTextSplitter

splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=100,
    separators=["\n\n", "\n", ". ", " ", ""]
)

LLM Hallucinations & Filtering

Cross-reference answers with retrieved documents using semantic similarity:

from numpy import dot
from numpy.linalg import norm

def cosine_similarity(a, b):
    return dot(a, b) / (norm(a) * norm(b))

def check_hallucination(answer, documents):
    embeddings = OpenAIEmbeddings()
    
    answer_emb = embeddings.embed_query(answer)
    doc_embs = embeddings.embed_documents([d.page_content for d in documents])
    
    similarities = [cosine_similarity(answer_emb, doc_emb) for doc_emb in doc_embs]
    return max(similarities) > 0.7

Next Steps

LangGraph RAG is useful for more than just a Q&A system. Potential applications include:


  • Customer support: Route queries to humans when confidence is low.

  • Code documentation assistants: Pull from docs and escalate to GitHub issues when information is missing.

  • Research assistants: Iterate across multiple queries to gather comprehensive information.

  • Multi-modal RAG: Combine text, code, and image retrieval in a single workflow.


Graphs can be enhanced with additional nodes for citation extraction, fact verification, or query decomposition. Multiple retrievers, such as vector stores, web search, and SQL databases, can be combined in one graph to handle more complex workflows.


You can also connect with our experts for personalized consultation or support in building and optimizing your LangGraph RAG workflows.


Frequently Asked Questions

How much does LangGraph RAG cost compared to OpenAI Assistants API?

LangGraph is open source and free. You pay for LLM API calls (OpenAI, Anthropic) and vector database usage. OpenAI Assistants API charges per run plus retrieval fees. With LangGraph, control costs by using cheaper models (gpt-4o-mini) for grading and expensive models (gpt-4o) only for generation. Self-hosting vector stores eliminates retrieval fees.

Can I migrate my existing LangChain RAG pipeline to LangGraph?

Yes. Wrap each chain step in a node function. Replace sequential logic with graph edges. Reuse existing retrievers, embeddings, and memory components without modification. Migration typically takes a few hours. The main work is defining state and converting conditional logic into routing functions.

What are the latency benchmarks for LangGraph RAG vs traditional RAG?

LangGraph adds minimal overhead (under 50ms) for graph coordination. Real latency comes from LLM calls and retrieval. Adaptive routing can reduce latency by skipping unnecessary steps. Self-reflective patterns increase latency due to validation calls but improve quality. Expect 2-5 seconds per query depending on your LLM provider.

Does LangGraph RAG work with Azure OpenAI and AWS Bedrock?

Yes. LangGraph supports any LLM provider with a LangChain integration:

from langchain_openai import AzureChatOpenAI

llm = AzureChatOpenAI(
    azure_endpoint="https://your-endpoint.openai.azure.com/",
    api_version="2024-02-01",
    model="gpt-4o"
)

Or for Bedrock:

from langchain_community.chat_models import BedrockChat

llm = BedrockChat(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    region_name="us-east-1"
)

Graph logic remains identical regardless of provider.

How do I deploy LangGraph RAG to production on Kubernetes?

Containerize with a Dockerfile:

FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "app.py"]
Add health checks for Kubernetes probes:
python
from fastapi import FastAPI

app = FastAPI()

@app.get("/health")
def health():
    return {"status": "healthy"}

@app.post("/query")
def query(q: str):
    result = graph.invoke({"question": q, ...})
    return result

Use Helm for deployment. Integrate Prometheus for metrics. Use persistent checkpointers (PostgreSQL or Redis) instead of SQLite for distributed deployments. Run multiple replicas behind a load balancer for horizontal scaling.


Join our newsletter for fresh insights, once a month. No spam.

 
 
bottom of page