开启左侧

LangGraph:构建智能工作流的新方式

[复制链接]
米落枫 发表于 3 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:安娜的信息安全说
1. 什么是 LangGraph?

LangGraph 是一个开源框架,旨在帮助开发者使用语言模型(LLM)构建复杂的、有状态的应用程序。它的核心理念是将应用逻辑表示为 图(Graph),其中节点代表操作或任务,边表示任务之间的依赖关系。这种方式非常适合处理多步骤推理、分支逻辑以及需要记忆状态的场景。
2. 为什么选择 LangGraph?

传统的 LLM 应用通常是线性的:输入 → 模型 → 输出。但在实际业务中,很多任务并不是单一的,而是包含多个步骤、条件分支甚至循环。LangGraph 的优势在于:
    图结构灵活性:可以轻松表示复杂的工作流。状态管理:支持在节点之间传递和持久化状态。可扩展性:可以集成外部 API、数据库、工具等。异步执行:提高性能,适合大规模应用。
3. 核心概念

    节点(Node):每个节点是一个操作,可以是调用 LLM、执行代码、访问 API 等。边(Edge):定义节点之间的依赖关系。状态(State):在图中流动的数据,可以是用户输入、模型输出或中间结果。执行器(Executor):负责调度和运行整个图。
4. 应用场景

    多步骤问答系统:例如法律咨询,需要先解析问题,再检索法规,最后生成答案。智能客服工作流:根据用户意图动态选择处理路径。数据处理管道:结合 LLM 和传统算法,完成复杂的数据清洗和分析。自动化决策系统:在金融、医疗等领域,结合规则和模型进行决策。
5. 代码示例

1. 安装与基础准备
  1. pip install -U langgraph langchain openai tiktoken
  2. # 如果用 Anthropic/DeepSeek 等模型,请安装各自 SDK
复制代码
说明:LangGraph 与多数 LLM/工具生态兼容,这里以 LangChain 的 ChatOpenAI 作为示例(你可以替换为任意支持的模型)。
2. 入门示例:三步工作流(分析 → 检索 → 生成)

这个例子演示一个最小化图:把用户问题解析为检索查询,再进行(模拟)检索,最后生成答案。
  1. # 语言:Python 3.10+
  2. from typing import TypedDict, List
  3. from langgraph.graph import StateGraph, END
  4. # 1) 定义有状态数据结构(在节点之间传递)
  5. class State(TypedDict):
  6.     question: str
  7.     query: str
  8.     docs: List[str]
  9.     answer: str
  10. # 2) 定义节点函数
  11. def analyze_question(state: State) -> State:
  12.     q = state["question"]
  13.     # 简单规则:提取关键词(真实场景可用 LLM)
  14.     state["query"] = q.replace("请", "").replace("怎么", "").strip()
  15.     return state
  16. def search_docs(state: State) -> State:
  17.     # 这里用模拟数据,真实场景你可调用搜索 API 或数据库
  18.     query = state["query"]
  19.     state["docs"] = [
  20.         f"文档1:与『{query}』相关的背景知识。",
  21.         f"文档2:『{query}』的常见实践。",
  22.     ]
  23.     return state
  24. def synthesize_answer(state: State) -> State:
  25.     docs = state["docs"]
  26.     question = state["question"]
  27.     combined = "\n".join(docs)
  28.     state["answer"] = f"问题:{question}\n——基于检索到的信息——\n{combined}\n\n结论:建议按步骤执行。"
  29.     return state
  30. # 3) 构建图
  31. graph = StateGraph(State)
  32. graph.add_node("analyze", analyze_question)
  33. graph.add_node("search", search_docs)
  34. graph.add_node("answer", synthesize_answer)
  35. # 定义边
  36. graph.set_entry_point("analyze")
  37. graph.add_edge("analyze", "search")
  38. graph.add_edge("search", "answer")
  39. graph.add_edge("answer", END)
  40. # 4) 编译并运行
  41. app = graph.compile()
  42. # 5) 执行一次
  43. if __name__ == "__main__":
  44.     result = app.invoke({"question": "请解释怎么用 LangGraph 构建复杂工作流?"})
  45.     print("最终答案:\n", result["answer"])
复制代码
3. 接入 LLM 与工具调用(Agent 风格)

下面示例展示:一个节点用 LLM 进行意图识别,另一个节点根据意图调用工具(如计算器),最后再用 LLM 生成总结。
  1. from typing import TypedDict, Literal, Dict, Any
  2. from langgraph.graph import StateGraph, END
  3. from langchain_openai import ChatOpenAI
  4. llm = ChatOpenAI(model="gpt-4o-mini")  # 任选模型
  5. class AgentState(TypedDict):
  6.     user_input: str
  7.     intent: Literal["math", "search", "chat"]
  8.     tool_result: str
  9.     final: str
  10. def classify_intent(state: AgentState) -> AgentState:
  11.     prompt = f"用户输入:{state['user_input']}\n判断意图:math/search/chat(三选一,直接输出关键词)"
  12.     intent = llm.invoke(prompt).content.strip().lower()
  13.     if intent not in {"math", "search", "chat"}:
  14.         intent = "chat"
  15.     state["intent"] = intent  # type: ignore
  16.     return state
  17. def tool_router(state: AgentState) -> str:
  18.     # 动态选择下一个节点
  19.     return state["intent"]
  20. def math_tool(state: AgentState) -> AgentState:
  21.     text = state["user_input"]
  22.     # 很简单的表达式解析(真实场景可用安全的解析库)
  23.     try:
  24.         expr = text.replace("计算", "").strip()
  25.         value = eval(expr, {"__builtins__": {}})
  26.         state["tool_result"] = f"计算结果:{value}"
  27.     except Exception as e:
  28.         state["tool_result"] = f"无法计算表达式:{e}"
  29.     return state
  30. def search_tool(state: AgentState) -> AgentState:
  31.     q = state["user_input"]
  32.     # 模拟搜索
  33.     state["tool_result"] = f"搜索:找到与『{q}』相关的3条结果(模拟)"
  34.     return state
  35. def chat_fallback(state: AgentState) -> AgentState:
  36.     # 无工具,直接返回占位
  37.     state["tool_result"] = "普通对话,无需工具。"
  38.     return state
  39. def summarize(state: AgentState) -> AgentState:
  40.     prompt = f"""你是助手。
  41. 用户输入:{state['user_input']}
  42. 工具结果:{state['tool_result']}
  43. 请用简洁中文总结并给出下一步建议。"""
  44.     state["final"] = llm.invoke(prompt).content
  45.     return state
  46. graph = StateGraph(AgentState)
  47. graph.add_node("classify", classify_intent)
  48. graph.add_node("math", math_tool)
  49. graph.add_node("search", search_tool)
  50. graph.add_node("chat", chat_fallback)
  51. graph.add_node("summarize", summarize)
  52. graph.set_entry_point("classify")
  53. # 动态路由:从 classify 到 math/search/chat
  54. graph.add_conditional_edges("classify", tool_router, {
  55.     "math": "math",
  56.     "search": "search",
  57.     "chat": "chat"
  58. })
  59. # 三条路径均指向 summarize
  60. graph.add_edge("math", "summarize")
  61. graph.add_edge("search", "summarize")
  62. graph.add_edge("chat", "summarize")
  63. graph.add_edge("summarize", END)
  64. app = graph.compile()
  65. if __name__ == "__main__":
  66.     print(app.invoke({"user_input": "计算 21*2+3"}))
  67.     print(app.invoke({"user_input": "LangGraph 最新文档在哪?"}))
  68.     print(app.invoke({"user_input": "你好,今天心情不错"}))
复制代码
4. 分支与循环(包含重试/校验)

这个示例引入一个验证节点,如果答案不合格则回到生成节点重试(形成一个小环)。
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph, END
  3. class LoopState(TypedDict):
  4.     draft: str
  5.     tries: int
  6.     ok: bool
  7. def draft_answer(state: LoopState) -> LoopState:
  8.     state["draft"] = f"草稿 v{state['tries']+1}: 内容待完善。"
  9.     state["tries"] += 1
  10.     return state
  11. def validate(state: LoopState) -> LoopState:
  12.     # 简单规则:尝试次数 >= 2 就认为合格
  13.     state["ok"] = state["tries"] >= 2
  14.     return state
  15. def router(state: LoopState) -> str:
  16.     return "end" if state["ok"] else "again"
  17. graph = StateGraph(LoopState)
  18. graph.add_node("draft", draft_answer)
  19. graph.add_node("validate", validate)
  20. graph.set_entry_point("draft")
  21. graph.add_edge("draft", "validate")
  22. graph.add_conditional_edges("validate", router, {
  23.     "again": "draft",
  24.     "end": END
  25. })
  26. app = graph.compile()
  27. if __name__ == "__main__":
  28.     result = app.invoke({"draft": "", "tries": 0, "ok": False})
  29.     print(result)
复制代码
5. 状态持久化与检查点(内置 Checkpointer)

当你需要 多轮对话长链路恢复,可以使用检查点保存状态。下面用内存检查点(生产建议用 SQLite/Postgres 等持久化)。
  1. from typing import TypedDict
  2. from langgraph.checkpoint.memory import MemorySaver
  3. from langgraph.graph import StateGraph, END
  4. class ChatState(TypedDict):
  5.     history: list
  6.     turn: str
  7. def add_turn(state: ChatState) -> ChatState:
  8.     state["history"].append(state["turn"])
  9.     state["turn"] = ""
  10.     return state
  11. graph = StateGraph(ChatState)
  12. graph.add_node("record", add_turn)
  13. graph.set_entry_point("record")
  14. graph.add_edge("record", END)
  15. checkpointer = MemorySaver()
  16. app = graph.compile(checkpointer=checkpointer)
  17. if __name__ == "__main__":
  18.     # thread_id 标识会话
  19.     cfg = {"configurable": {"thread_id": "user_123"}}
  20.     print(app.invoke({"history": [], "turn": "你好"}, cfg))
  21.     print(app.invoke({"history": [], "turn": "今天的天气"}, cfg))
  22.     # 第二次调用会在同一个 thread_id 上追加历史
复制代码
要点:
    checkpointer=MemorySaver() 启用检查点。configurable.thread_id 用来标识不同会话。下一次调用无需携带完整历史,LangGraph 会自动从检查点恢复并合并状态。

6. 多代理协作(Reviewer + Coder)

示例:两个代理通过有向图协作,一个写代码,一个做评审,不合格则返回修改。
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph, END
  3. from langchain_openai import ChatOpenAI
  4. coder_llm = ChatOpenAI(model="gpt-4o-mini")
  5. reviewer_llm = ChatOpenAI(model="gpt-4o-mini")
  6. class PRState(TypedDict):
  7.     task: str
  8.     code: str
  9.     review: str
  10.     approved: bool
  11. def coder(state: PRState) -> PRState:
  12.     prompt = f"需求:{state['task']} 请编写简洁、可运行的 Python 示例。"
  13.     state["code"] = coder_llm.invoke(prompt).content
  14.     return state
  15. def reviewer(state: PRState) -> PRState:
  16.     prompt = f"请评审以下代码并指出问题(若通过请写:APPROVED):\n{state['code']}"
  17.     review = reviewer_llm.invoke(prompt).content
  18.     state["review"] = review
  19.     state["approved"] = "APPROVED" in review.upper()
  20.     return state
  21. def route(state: PRState) -> str:
  22.     return "end" if state["approved"] else "fix"
  23. def fix_code(state: PRState) -> PRState:
  24.     prompt = f"评审意见:{state['review']}\n请据此修复代码:\n{state['code']}"
  25.     state["code"] = coder_llm.invoke(prompt).content
  26.     return state
  27. graph = StateGraph(PRState)
  28. graph.add_node("coder", coder)
  29. graph.add_node("reviewer", reviewer)
  30. graph.add_node("fix", fix_code)
  31. graph.set_entry_point("coder")
  32. graph.add_edge("coder", "reviewer")
  33. graph.add_conditional_edges("reviewer", route, {"end": END, "fix": "fix"})
  34. graph.add_edge("fix", "reviewer")
  35. app = graph.compile()
  36. if __name__ == "__main__":
  37.     res = app.invoke({"task": "实现一个斐波那契数列函数", "code": "", "review": "", "approved": False})
  38.     print("最终代码:\n", res["code"])
  39.     print("评审结果:\n", res["review"])
复制代码
7. 流式输出与异步(边跑边看)

当节点生成较长文本时,你可能希望将中间 token/内容流式推送到前端。LangGraph 支持事件流。下面是一个简单的 streaming 示例。
  1. import asyncio
  2. from typing import TypedDict
  3. from langgraph.graph import StateGraph, END
  4. from langchain_openai import ChatOpenAI
  5. llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)  # 开启流式
  6. class StreamState(TypedDict):
  7.     prompt: str
  8.     output: str
  9. async def gen(state: StreamState) -> StreamState:
  10.     chunks = []
  11.     async for event in llm.astream(state["prompt"]):
  12.         # event 是增量块,具体取决于 SDK
  13.         chunks.append(event.content)
  14.         # 这里可将 event 推送到前端
  15.     state["output"] = "".join(chunks)
  16.     return state
  17. graph = StateGraph(StreamState)
  18. graph.add_node("gen", gen)
  19. graph.set_entry_point("gen")
  20. graph.add_edge("gen", END)
  21. app = graph.compile()
  22. async def main():
  23.     result = await app.ainvoke({"prompt": "请分点说明 LangGraph 的优势"})
  24.     print(result["output"])
  25. if __name__ == "__main__":
  26.     asyncio.run(main())
复制代码
提示:不同 LLM SDK 的流式接口略有差异,请按你选用的 SDK 文档调整(如 astream_events() / on_token() 回调)。

8. 最佳实践建议

    状态结构要清晰:使用 TypedDict/Dataclass 明确字段含义,保证节点输入/输出一致。节点纯函数化:尽量让节点只根据输入状态产生输出,方便测试与复用。路由逻辑收敛:复杂条件用单独的路由函数,避免在节点里掺杂太多分支。可观察性:为关键节点加日志/事件推送,便于调试与性能监控。持久化:生产环境使用数据库型 Checkpointer(如 SQLite、Postgres),保证恢复正确。异常与重试:配合循环或重试节点处理不稳定外部依赖(API、工具)。

原文地址:https://blog.csdn.net/nielilijy/article/details/156396469
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )