AI创想

标题: LangGraph框架 状态图&节点&边&Send [打印本页]

作者: 创想小编    时间: 4 小时前
标题: LangGraph框架 状态图&节点&边&Send
作者:CSDN博客
什么是LangGraph

Langgraph是一个用于构建循环、有状态的多智能体(Multi-agent)应用的框架,它的特点是:
Langgraph与langchain区别

首先,LangGraph和LangChain是两个相关但不同的工具,都来自LangChain生态系统
LangChain

LangChain是一个用于构建大语言模型应用程序的框架
LangGraph

LangGraph是LangChain团队开发的更高级工具,专门用于构建复杂的多智能体系统:
总而言之,Langchain的工作流程很简单,就是用户Query——向量检索——模型生成——格式化输出,对于那些流程明确的工作(比如文本翻译、问答),就适合langchain框架。
langgraph更适合解决复杂的任务,langgraph原生支持持久化不需要像langchain一样专门存储历史记录,langgraph支持循环结构,允许模型在犯错的时候从上一步重新开始,同时langgraph支持多智能体协作,在一些重要决策前加入人工判断的步骤。
langgraph的基础知识

langgraph安装
  1. pip install -U "langgraph"
复制代码
核心基础

State(状态)整个工作流中被传输和存储的数据的数据结构
Node(结点)工作流中执行的一个步骤(调用函数、工具)
Edge(边)控制流程下一步应该去哪个结点
Graph(流程图)将所有边和结点组织成一张流程图
LangGraph 的核心是将代理工作流程建模为图表,它有三个核心组件:
通过组合Node和Edge,您可以创建复杂的循环工作流,使其state随时间推移而演化。然而,真正的强大之处在于 LangGraph 对 的管理发送state
状态图Stategraph

Stategraph是主要使用的python图形类,这个类提供了很多方法:
为什么叫“状态图”而不是“流程图”
LangGraph 不只是流程控制,还强调:
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph
  3. # 定义状态结构
  4. class MyState(TypedDict):
  5.     question: str
  6.     answer: str
  7. # 定义节点函数
  8. def search_node(state):
  9.     return {"answer": "这是答案"}
  10. # 创建状态图
  11. builder = StateGraph(state_schema=MyState)
  12. # 添加一个节点
  13. builder.add_node("search", search_node)
  14. # 第一个要调用的节点
  15. builder.set_entry_point("search")
  16. # 要构建图,首先要定义状态,然后添加节点和边,最后进行编译,会进行基本的检查
  17. graph = builder.compile()
  18. # 执行图
  19. result = graph.invoke({"question": "什么是状态图?"})
  20. print(result["answer"])  # 输出:这是答案
复制代码
状态State

在使用 LangGraph 构建流程图之前,第一件事就是定义图的状态 State。这是整个图运行中用于共享和传递信息的核心机制。
LangGraph 中的 State 是图中所有节点(Node)之间传递数据的模式结构,可以类比为一个共享的上下文字典,它包含输入、输出、中间变量等
定义 State 时,需要包含两个部分:
Schema(模式):指定 State 的字段结构(一般用TypedDict或Pydanic)
  1. # langgraph推荐使用TypedDict
  2. """
  3. 1. TypedDict 是标准库的一部分(来自 typing 模块),零依赖,零性能开销而 Pydantic 会在每一步创建模型实例,会增加运行时负担
  4. 2. LangGraph 中的 State 实质就是一个字典(dict),而 TypedDict 就是“有类型注解的 dict”,与 LangGraph 的执行机制无缝对接,而 Pydantic 是类结构,需要 .dict() 转换,略显多余
  5. """
  6. from typing import TypedDict
  7. class State1(TypedDict):
  8.     user_input: str
  9. # 使用 pydantic 可以进行参数校验和提供默认值
  10. from pydantic import BaseModel
  11. class State2(BaseModel):
  12.     question: str
  13.     result: str = ""
复制代码
多个模式(Multiple Schemas):在大多数情况下,LangGraph 使用一个统一的 State 模式。但你也可以设置“输入模式”和“输出模式”分开
Annotated[类型, Reducer函数] :这里的 add 表示:每次节点返回这个字段时,都把新列表拼接到旧列表后面
在图表中使用消息

在许多情况下,将之前的对话历史记录以消息列表的形式存储在图状态中会很有帮助。为此,我们可以向图状态添加一个键(通道),该键存储Message对象列表,并使用 Reducer 函数对其进行注释。Reducer 函数对于指示图如何Message在每次状态更新(例如,当节点发送更新时)时更新状态中的对象列表至关重要。如果您未指定 Reducer,则每次状态更新都会用最新提供的值覆盖消息列表。如果您只想将消息附加到现有列表中,可以使用operator。
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph
  3. from typing import Annotated
  4. import operator
  5. # 定义状态结构   如果定义的是list[dict],会覆盖之前的数据
  6. class ChatState(TypedDict):
  7.     messages: Annotated[list, operator.add]  # 每条消息是 {role, content},会自动追加到列表末尾
  8. # 节点函数:添加用户问题
  9. def user_input_node(state: ChatState) -> dict:
  10.     user_msg = {"role": "user", "content": "什么是LangGraph?"}
  11.     return {"messages": [user_msg]}
  12. # 节点函数:添加助手回复
  13. def assistant_node(state: ChatState) -> dict:
  14.     reply = {"role": "assistant", "content": "LangGraph 是一个有状态的图编排框架。"}
  15.     return {"messages": [reply]}
  16. # 构建状态图
  17. builder = StateGraph(state_schema=ChatState)
  18. builder.add_node("user_input", user_input_node)
  19. builder.add_node("assistant_reply", assistant_node)
  20. builder.set_entry_point("user_input")
  21. builder.add_edge("user_input", "assistant_reply")
  22. graph = builder.compile()
  23. result = graph.invoke({"messages": []})
  24. print(result["messages"])
复制代码
MessagesState

MessagesState是langgraph中自带的状态模板,它能够自动存储对话历史并自动追加消息,不需要写typedict和reducer
  1. from langgraph.graph import MessagesState
  2. # 和上述代码不同会在State类中自动维护一个messages 字段,不需要显示创建
  3. class State(MessagesState):
  4.     documents: list[str]
复制代码
节点Nodes

节点(Nodes)是图中执行逻辑的基本单位。每个节点表示一个函数步骤、处理阶段或子逻辑流程,多个节点通过边连接成有向图,组成一个完整的有状态计算流程。
LangGraph 中的节点就是你定义的一个函数(或 Runnable 对象),用于接收状态、执行逻辑,并返回更新后的状态
  1. def my_node(state: dict) -> dict:
  2.     # 处理输入状态,并返回更新字段
  3.     return {"new_key": "new_value"}
  4.    
  5. # LangGraph 会自动用 reducer 把这些更新合并进全局状态。
复制代码
START节点

Node START是一个特殊节点,表示将用户输入发送到图的节点。引用此节点的主要目的是确定应首先调用哪些节点。
  1. from langgraph.graph import START
  2. graph.add_edge(START, "node_a")
复制代码
add_edge负责把两个节点连接起来:workflow.add_edge(上一个节点, 下一个节点)
End节点

Node End是一个特殊节点,表示终端节点。当需要指示哪些边在完成后没有操作时,可以引用此节点。
  1. from langgraph.graph import END
  2. graph.add_edge("node_a", END)
复制代码
并行运行节点

并行运行节点指的是同时启动多个不互相依赖的节点,而不是按顺序一个接一个地执行。
langgraph中实现并行只需要将同一个起点的边指向多个不同的节点即可
  1. # 当 start_node 运行结束时,A 和 B 会同时启动
  2. workflow.add_edge("start_node", "node_A")
  3. workflow.add_edge("start_node", "node_B")
复制代码
边Edge

Edge(边)是连接节点的通道,表示图中节点之间的执行跳转关系。你可以把它理解为「节点执行完之后,下一步去哪,是构成 LangGraph 流程图的核心
普通边
  1. graph.add_edge("节点A", "节点B")
复制代码
条件边

条件边会根据state里的内容,判断下一步执行哪一个逻辑函数
  1. def route_condition(state: MyState):
  2.     """条件函数:只负责路由决策"""
  3.     if state["type"] == "a":
  4.         return "a"
  5.     elif state["type"] == "b":
  6.         return "b"
  7.     else:
  8.         return "default"
  9. def node_a(state):
  10.     return {"result": "走了 A 分支"}
  11. def node_b(state):
  12.     return {"result": "走了 B 分支"}
  13. def node_default(state):
  14.     return {"result": "走了默认分支"}
  15. graph.add_conditional_edges("judge_node", route_condition, {
  16.     "a": "a",
  17.     "b": "b",
  18.     "default": "default"
  19. })
复制代码
条件边的格式
workflow.add_conditional_edges(
    "起点节点",
    routing_function,      # 判断逻辑函数
    {"path_a": "节点A", "path_b": "节点B"} # 路由映射表
)
条件入口点

根据初始输入的数据,动态决定首先运行哪个节点
  1. from langgraph.graph import StateGraph, START, END
  2. # 1. 定义路由函数
  3. def route_start(state: State):
  4.     if state["question"].startswith("查资料"):
  5.         return "search_node"
  6.     else:
  7.         return "chat_node"
  8. # 2. 构建图
  9. builder = StateGraph(State)
  10. builder.add_node("search_node", search_func)
  11. builder.add_node("chat_node", chat_func)
  12. # 3. 设置条件入口点:从 START 开始进行条件跳转
  13. builder.add_conditional_edges(
  14.     START,           # 起点是内置的 START
  15.     route_start,     # 判断函数
  16.     {
  17.         "search_node": "search_node",
  18.         "chat_node": "chat_node"
  19.     }
  20. )
复制代码
发送Send

默认情况下,Nodes和Edges是提前定义的,并在相同的共享状态下运行。但是,在某些情况下,确切的边无法提前知道,并且可能希望同时存在State的不同版本。
也就是说,普通的边只能沿着图预设的路线走;而send可以强制跳到任意节点并传输数据,支持运行时动态地分发任务
Map-Reduce

Map-Reduce就是send的工作原理,它是一种经典的并行计算模式,特别适合处理大规模数据
Map-Reduce 将复杂的数据处理任务分解为两个阶段:
标准结构是  Send(目标节点名称, 要传递的状态数据)
Send("node_name", {"key": value})
  1. """
  2. LangGraph Map-Reduce 简单案例:数字求和
  3. 把一堆数字分给多个worker算平方,然后把结果加起来
  4. """
  5. from typing import Annotated
  6. import operator
  7. from langgraph.graph import StateGraph, START, END
  8. from langgraph.constants import Send
  9. from typing import TypedDict, List
  10. # 状态定义
  11. class State(TypedDict):
  12.     numbers: List[int]        # 输入的数字
  13.     results: Annotated[list[int], operator.add]        # worker的结果
  14.     final_sum: int           # 最终求和
  15. # 1. Map阶段:分发数字
  16. def split_numbers(state: State):
  17.     """把数字分发给不同的worker"""
  18.     numbers = state["numbers"]
  19.     print(f"分发数字: {numbers}")
  20.     # 每个数字发给一个worker
  21.     return [Send("worker", {"number": num}) for num in numbers]
  22. # 2. Worker阶段:计算平方
  23. def calculate_square(state: State):
  24.     """每个worker计算一个数字的平方"""
  25.     number = state["number"]
  26.     square = number * number
  27.     print(f"Worker: {number}² = {square}")
  28.     return {"results": [square]}
  29. # 3. Reduce阶段:求和
  30. def sum_results(state: State):
  31.     """把所有结果加起来"""
  32.     results = state.get("results", [])
  33.     total = sum(results)
  34.     print(f"求和: {results} = {total}")
  35.     return {"final_sum": total}
  36. # 构建图
  37. def create_simple_graph():
  38.     graph = StateGraph(State)
  39.     # 添加节点
  40.     graph.add_node("splitter", lambda s: s)  # 分发器
  41.     graph.add_node("worker", calculate_square)  # 工作节点
  42.     graph.add_node("summer", sum_results)      # 求和器
  43.     # 连接节点
  44.     graph.add_edge(START, "splitter")
  45.     graph.add_conditional_edges("splitter", split_numbers, ["worker"])  # Map阶段
  46.     graph.add_edge("worker", "summer")  # Worker完成后求和
  47.     graph.add_edge("summer", END)
复制代码
注意这里,graph.add_node("worker",calculate_square)
第一个字符串参数是节点中的标识符
第二个参数是具体要执行的函数
这里解释两个点
命令Command

Command 是 LangGraph 中用于控制图执行流程、更新图状态,并支持人机交互、工具调用的标准化对象。
核心作用:

第一,更新图的运行状态;
第二,控制图的执行流向(指定下一个或多个执行节点);
第三,衔接中断恢复、工具调用、人机交互等场景
command参数

update:应用状态更新(类似于从节点返回更新)。
goto:导航到特定节点(类似于条件边,以后无需再stategraph里连线)。
graph:在从子图导航时定位到父图。
resume:在中断后提供一个值以继续执行。
在节点函数中返回command时,必须添加返回类型注释,其中包含节点路由到的节点名称列表,例如command[Literal ["my_other_node"]]。这对于图形渲染是必需的,它告诉 LangGraph 当前节点可以导航到my_other_node。
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph, END
  3. from langgraph.types import Command, Send
  4. from typing import Literal
  5. class MyState(TypedDict):
  6.     type: str
  7.     text: str
  8.     result: str
  9. def judge_node(state: MyState) -> Command[Literal["a", "b", "default"]]:
  10.     """条件函数:使用Command进行路由和状态更新"""
  11.     if state["type"] == "a":
  12.         return Command(update={"text": "走了 A 分支"}, goto="a")
  13.     elif state["type"] == "b":
  14.         return Command(update={"text": "走了 B 分支"}, goto="b")
  15.     else:
  16.         return Command(update={"text": "走了默认分支"}, goto="default")
  17. def node_a(state):
  18.     return {"result": f"A节点处理: {state['text']}"}
  19. def node_b(state):
  20.     return {"result": f"B节点处理: {state['text']}"}
  21. def node_default(state):
  22.     return {"result": f"默认节点处理: {state['text']}"}
  23. # 构建图
  24. graph = StateGraph(state_schema=MyState)
  25. graph.add_node("judge_node", judge_node)
  26. graph.add_node("a", node_a)
  27. graph.add_node("b", node_b)
  28. graph.add_node("default", node_default)
  29. graph.set_entry_point("judge_node")
  30. # 添加结束边
  31. graph.add_edge("a", END)
  32. graph.add_edge("b", END)
  33. graph.add_edge("default", END)
  34. app = graph.compile()
  35. # 测试
  36. print("测试 A:", app.invoke({"type": "a", "text": "", "result": ""}))
  37. print("测试 B:", app.invoke({"type": "b", "text": "", "result": ""}))
  38. print("测试其他:", app.invoke({"type": "default", "text": "", "result": ""}))
复制代码
在这里,我们使用了Command对象过后就不需要写builder.add_nodebuilder.add_conditional_edges函数了
什么时候应该使用Command而不是条件边

如果你想在节点内部既要改状态state,又要实现跳转,那么就必须使用Command,在条件边中跳转状态修改分散在两个函数里面,而command可以把这两个逻辑聚在一起。

原文地址:https://blog.csdn.net/jiaranran8/article/details/160506504




欢迎光临 AI创想 (https://llms-ai.com/) Powered by Discuz! X3.4