作者:安娜的信息安全说
1. 什么是 LangGraph?
LangGraph 是一个开源框架,旨在帮助开发者使用语言模型(LLM)构建复杂的、有状态的应用程序。它的核心理念是将应用逻辑表示为 图(Graph),其中节点代表操作或任务,边表示任务之间的依赖关系。这种方式非常适合处理多步骤推理、分支逻辑以及需要记忆状态的场景。
2. 为什么选择 LangGraph?
传统的 LLM 应用通常是线性的:输入 → 模型 → 输出。但在实际业务中,很多任务并不是单一的,而是包含多个步骤、条件分支甚至循环。LangGraph 的优势在于:
图结构灵活性:可以轻松表示复杂的工作流。状态管理:支持在节点之间传递和持久化状态。可扩展性:可以集成外部 API、数据库、工具等。异步执行:提高性能,适合大规模应用。
3. 核心概念
节点(Node):每个节点是一个操作,可以是调用 LLM、执行代码、访问 API 等。边(Edge):定义节点之间的依赖关系。状态(State):在图中流动的数据,可以是用户输入、模型输出或中间结果。执行器(Executor):负责调度和运行整个图。
4. 应用场景
多步骤问答系统:例如法律咨询,需要先解析问题,再检索法规,最后生成答案。智能客服工作流:根据用户意图动态选择处理路径。数据处理管道:结合 LLM 和传统算法,完成复杂的数据清洗和分析。自动化决策系统:在金融、医疗等领域,结合规则和模型进行决策。
5. 代码示例
1. 安装与基础准备
- pip install -U langgraph langchain openai tiktoken
- # 如果用 Anthropic/DeepSeek 等模型,请安装各自 SDK
复制代码说明:LangGraph 与多数 LLM/工具生态兼容,这里以 LangChain 的 ChatOpenAI 作为示例(你可以替换为任意支持的模型)。
2. 入门示例:三步工作流(分析 → 检索 → 生成)
这个例子演示一个最小化图:把用户问题解析为检索查询,再进行(模拟)检索,最后生成答案。- # 语言:Python 3.10+
- from typing import TypedDict, List
- from langgraph.graph import StateGraph, END
- # 1) 定义有状态数据结构(在节点之间传递)
- class State(TypedDict):
- question: str
- query: str
- docs: List[str]
- answer: str
- # 2) 定义节点函数
- def analyze_question(state: State) -> State:
- q = state["question"]
- # 简单规则:提取关键词(真实场景可用 LLM)
- state["query"] = q.replace("请", "").replace("怎么", "").strip()
- return state
- def search_docs(state: State) -> State:
- # 这里用模拟数据,真实场景你可调用搜索 API 或数据库
- query = state["query"]
- state["docs"] = [
- f"文档1:与『{query}』相关的背景知识。",
- f"文档2:『{query}』的常见实践。",
- ]
- return state
- def synthesize_answer(state: State) -> State:
- docs = state["docs"]
- question = state["question"]
- combined = "\n".join(docs)
- state["answer"] = f"问题:{question}\n——基于检索到的信息——\n{combined}\n\n结论:建议按步骤执行。"
- return state
- # 3) 构建图
- graph = StateGraph(State)
- graph.add_node("analyze", analyze_question)
- graph.add_node("search", search_docs)
- graph.add_node("answer", synthesize_answer)
- # 定义边
- graph.set_entry_point("analyze")
- graph.add_edge("analyze", "search")
- graph.add_edge("search", "answer")
- graph.add_edge("answer", END)
- # 4) 编译并运行
- app = graph.compile()
- # 5) 执行一次
- if __name__ == "__main__":
- result = app.invoke({"question": "请解释怎么用 LangGraph 构建复杂工作流?"})
- print("最终答案:\n", result["answer"])
复制代码 3. 接入 LLM 与工具调用(Agent 风格)
下面示例展示:一个节点用 LLM 进行意图识别,另一个节点根据意图调用工具(如计算器),最后再用 LLM 生成总结。- from typing import TypedDict, Literal, Dict, Any
- from langgraph.graph import StateGraph, END
- from langchain_openai import ChatOpenAI
- llm = ChatOpenAI(model="gpt-4o-mini") # 任选模型
- class AgentState(TypedDict):
- user_input: str
- intent: Literal["math", "search", "chat"]
- tool_result: str
- final: str
- def classify_intent(state: AgentState) -> AgentState:
- prompt = f"用户输入:{state['user_input']}\n判断意图:math/search/chat(三选一,直接输出关键词)"
- intent = llm.invoke(prompt).content.strip().lower()
- if intent not in {"math", "search", "chat"}:
- intent = "chat"
- state["intent"] = intent # type: ignore
- return state
- def tool_router(state: AgentState) -> str:
- # 动态选择下一个节点
- return state["intent"]
- def math_tool(state: AgentState) -> AgentState:
- text = state["user_input"]
- # 很简单的表达式解析(真实场景可用安全的解析库)
- try:
- expr = text.replace("计算", "").strip()
- value = eval(expr, {"__builtins__": {}})
- state["tool_result"] = f"计算结果:{value}"
- except Exception as e:
- state["tool_result"] = f"无法计算表达式:{e}"
- return state
- def search_tool(state: AgentState) -> AgentState:
- q = state["user_input"]
- # 模拟搜索
- state["tool_result"] = f"搜索:找到与『{q}』相关的3条结果(模拟)"
- return state
- def chat_fallback(state: AgentState) -> AgentState:
- # 无工具,直接返回占位
- state["tool_result"] = "普通对话,无需工具。"
- return state
- def summarize(state: AgentState) -> AgentState:
- prompt = f"""你是助手。
- 用户输入:{state['user_input']}
- 工具结果:{state['tool_result']}
- 请用简洁中文总结并给出下一步建议。"""
- state["final"] = llm.invoke(prompt).content
- return state
- graph = StateGraph(AgentState)
- graph.add_node("classify", classify_intent)
- graph.add_node("math", math_tool)
- graph.add_node("search", search_tool)
- graph.add_node("chat", chat_fallback)
- graph.add_node("summarize", summarize)
- graph.set_entry_point("classify")
- # 动态路由:从 classify 到 math/search/chat
- graph.add_conditional_edges("classify", tool_router, {
- "math": "math",
- "search": "search",
- "chat": "chat"
- })
- # 三条路径均指向 summarize
- graph.add_edge("math", "summarize")
- graph.add_edge("search", "summarize")
- graph.add_edge("chat", "summarize")
- graph.add_edge("summarize", END)
- app = graph.compile()
- if __name__ == "__main__":
- print(app.invoke({"user_input": "计算 21*2+3"}))
- print(app.invoke({"user_input": "LangGraph 最新文档在哪?"}))
- print(app.invoke({"user_input": "你好,今天心情不错"}))
复制代码 4. 分支与循环(包含重试/校验)
这个示例引入一个验证节点,如果答案不合格则回到生成节点重试(形成一个小环)。- from typing import TypedDict
- from langgraph.graph import StateGraph, END
- class LoopState(TypedDict):
- draft: str
- tries: int
- ok: bool
- def draft_answer(state: LoopState) -> LoopState:
- state["draft"] = f"草稿 v{state['tries']+1}: 内容待完善。"
- state["tries"] += 1
- return state
- def validate(state: LoopState) -> LoopState:
- # 简单规则:尝试次数 >= 2 就认为合格
- state["ok"] = state["tries"] >= 2
- return state
- def router(state: LoopState) -> str:
- return "end" if state["ok"] else "again"
- graph = StateGraph(LoopState)
- graph.add_node("draft", draft_answer)
- graph.add_node("validate", validate)
- graph.set_entry_point("draft")
- graph.add_edge("draft", "validate")
- graph.add_conditional_edges("validate", router, {
- "again": "draft",
- "end": END
- })
- app = graph.compile()
- if __name__ == "__main__":
- result = app.invoke({"draft": "", "tries": 0, "ok": False})
- print(result)
复制代码 5. 状态持久化与检查点(内置 Checkpointer)
当你需要 多轮对话 或 长链路恢复,可以使用检查点保存状态。下面用内存检查点(生产建议用 SQLite/Postgres 等持久化)。- from typing import TypedDict
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import StateGraph, END
- class ChatState(TypedDict):
- history: list
- turn: str
- def add_turn(state: ChatState) -> ChatState:
- state["history"].append(state["turn"])
- state["turn"] = ""
- return state
- graph = StateGraph(ChatState)
- graph.add_node("record", add_turn)
- graph.set_entry_point("record")
- graph.add_edge("record", END)
- checkpointer = MemorySaver()
- app = graph.compile(checkpointer=checkpointer)
- if __name__ == "__main__":
- # thread_id 标识会话
- cfg = {"configurable": {"thread_id": "user_123"}}
- print(app.invoke({"history": [], "turn": "你好"}, cfg))
- print(app.invoke({"history": [], "turn": "今天的天气"}, cfg))
- # 第二次调用会在同一个 thread_id 上追加历史
复制代码 要点:
checkpointer=MemorySaver() 启用检查点。configurable.thread_id 用来标识不同会话。下一次调用无需携带完整历史,LangGraph 会自动从检查点恢复并合并状态。
6. 多代理协作(Reviewer + Coder)
示例:两个代理通过有向图协作,一个写代码,一个做评审,不合格则返回修改。- from typing import TypedDict
- from langgraph.graph import StateGraph, END
- from langchain_openai import ChatOpenAI
- coder_llm = ChatOpenAI(model="gpt-4o-mini")
- reviewer_llm = ChatOpenAI(model="gpt-4o-mini")
- class PRState(TypedDict):
- task: str
- code: str
- review: str
- approved: bool
- def coder(state: PRState) -> PRState:
- prompt = f"需求:{state['task']} 请编写简洁、可运行的 Python 示例。"
- state["code"] = coder_llm.invoke(prompt).content
- return state
- def reviewer(state: PRState) -> PRState:
- prompt = f"请评审以下代码并指出问题(若通过请写:APPROVED):\n{state['code']}"
- review = reviewer_llm.invoke(prompt).content
- state["review"] = review
- state["approved"] = "APPROVED" in review.upper()
- return state
- def route(state: PRState) -> str:
- return "end" if state["approved"] else "fix"
- def fix_code(state: PRState) -> PRState:
- prompt = f"评审意见:{state['review']}\n请据此修复代码:\n{state['code']}"
- state["code"] = coder_llm.invoke(prompt).content
- return state
- graph = StateGraph(PRState)
- graph.add_node("coder", coder)
- graph.add_node("reviewer", reviewer)
- graph.add_node("fix", fix_code)
- graph.set_entry_point("coder")
- graph.add_edge("coder", "reviewer")
- graph.add_conditional_edges("reviewer", route, {"end": END, "fix": "fix"})
- graph.add_edge("fix", "reviewer")
- app = graph.compile()
- if __name__ == "__main__":
- res = app.invoke({"task": "实现一个斐波那契数列函数", "code": "", "review": "", "approved": False})
- print("最终代码:\n", res["code"])
- print("评审结果:\n", res["review"])
复制代码 7. 流式输出与异步(边跑边看)
当节点生成较长文本时,你可能希望将中间 token/内容流式推送到前端。LangGraph 支持事件流。下面是一个简单的 streaming 示例。- import asyncio
- from typing import TypedDict
- from langgraph.graph import StateGraph, END
- from langchain_openai import ChatOpenAI
- llm = ChatOpenAI(model="gpt-4o-mini", streaming=True) # 开启流式
- class StreamState(TypedDict):
- prompt: str
- output: str
- async def gen(state: StreamState) -> StreamState:
- chunks = []
- async for event in llm.astream(state["prompt"]):
- # event 是增量块,具体取决于 SDK
- chunks.append(event.content)
- # 这里可将 event 推送到前端
- state["output"] = "".join(chunks)
- return state
- graph = StateGraph(StreamState)
- graph.add_node("gen", gen)
- graph.set_entry_point("gen")
- graph.add_edge("gen", END)
- app = graph.compile()
- async def main():
- result = await app.ainvoke({"prompt": "请分点说明 LangGraph 的优势"})
- print(result["output"])
- if __name__ == "__main__":
- asyncio.run(main())
复制代码提示:不同 LLM SDK 的流式接口略有差异,请按你选用的 SDK 文档调整(如 astream_events() / on_token() 回调)。
8. 最佳实践建议
状态结构要清晰:使用 TypedDict/Dataclass 明确字段含义,保证节点输入/输出一致。节点纯函数化:尽量让节点只根据输入状态产生输出,方便测试与复用。路由逻辑收敛:复杂条件用单独的路由函数,避免在节点里掺杂太多分支。可观察性:为关键节点加日志/事件推送,便于调试与性能监控。持久化:生产环境使用数据库型 Checkpointer(如 SQLite、Postgres),保证恢复正确。异常与重试:配合循环或重试节点处理不稳定外部依赖(API、工具)。
原文地址:https://blog.csdn.net/nielilijy/article/details/156396469 |