开启左侧

LangGraph框架 状态图&节点&边&Send

[复制链接]
创想小编 发表于 昨天 14:06 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:CSDN博客
什么是LangGraph

Langgraph是一个用于构建循环、有状态的多智能体(Multi-agent)应用的框架,它的特点是:
    持久执行:构建能够在失败后持续运行、长时间运行并从中断处恢复的代理。
    人机交互:通过在任何时候检查和修改代理状态,融入人工监督。
    综合记忆: 创建具有短期工作记忆以进行持续推理和跨会话长时记忆的有状态代理。
    使用LangSmith调试:通过可视化工具深入了解复杂代理行为,追踪执行路径、捕获状态转移,并提供详细的运行时指标。
    简易部署:借助可扩展的基础架构,自信地部署复杂的代理系统,该基础架构旨在应对有状态、长时间运行的工作流所面临的独特挑战。
Langgraph与langchain区别

首先,LangGraph和LangChain是两个相关但不同的工具,都来自LangChain生态系统
LangChain

LangChain是一个用于构建大语言模型应用程序的框架
    线性工作流:主要支持顺序执行的链式操作
    组件库:提供丰富的预构建组件,如提示模板、向量存储、检索器等
    简单集成:易于快速原型开发和简单的LLM应用
    抽象层:为不同的LLM提供统一接口
    中间件:提供十多种预构建中间件和自定义中间件去进行Agent执行中的拦截
LangGraph

LangGraph是LangChain团队开发的更高级工具,专门用于构建复杂的多智能体系统:
    图状工作流:支持复杂的分支、循环和条件逻辑
    状态管理:内置强大的状态管理机制
    多智能体协作:原生支持多个AI智能体之间的交互
    复杂决策流:可以根据条件动态选择执行路径
    持久化:支持长时间运行的工作流和状态持久化
总而言之,Langchain的工作流程很简单,就是用户Query——向量检索——模型生成——格式化输出,对于那些流程明确的工作(比如文本翻译、问答),就适合langchain框架。
langgraph更适合解决复杂的任务,langgraph原生支持持久化不需要像langchain一样专门存储历史记录,langgraph支持循环结构,允许模型在犯错的时候从上一步重新开始,同时langgraph支持多智能体协作,在一些重要决策前加入人工判断的步骤。
langgraph的基础知识

langgraph安装
  1. pip install -U "langgraph"
复制代码
核心基础

State(状态)整个工作流中被传输和存储的数据的数据结构
Node(结点)工作流中执行的一个步骤(调用函数、工具)
Edge(边)控制流程下一步应该去哪个结点
Graph(流程图)将所有边和结点组织成一张流程图
LangGraph 的核心是将代理工作流程建模为图表,它有三个核心组件:
    State:通常是一个 TypedDictPydantic的BaseModel,表示当前流程共享的数据结构
    Node:接收当前值state作为输入,执行计算并返回一个新的state
    Edge:根据当前条件决定下一步执行哪个Node
通过组合Node和Edge,您可以创建复杂的循环工作流,使其state随时间推移而演化。然而,真正的强大之处在于 LangGraph 对 的管理发送state
状态图Stategraph

Stategraph是主要使用的python图形类,这个类提供了很多方法:
    向图中添加执行单元(节点)
    在节点之间连线
    添加条件分支
    指定图的起点
    验证图是否正确(是否有孤立结点)
为什么叫“状态图”而不是“流程图”
LangGraph 不只是流程控制,还强调:
    每个节点执行前、执行后都可以访问和修改状态(state)
    状态在节点之间流动
    节点的跳转可以依据状态来判断
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph
  3. # 定义状态结构
  4. class MyState(TypedDict):
  5.     question: str
  6.     answer: str
  7. # 定义节点函数
  8. def search_node(state):
  9.     return {"answer": "这是答案"}
  10. # 创建状态图
  11. builder = StateGraph(state_schema=MyState)
  12. # 添加一个节点
  13. builder.add_node("search", search_node)
  14. # 第一个要调用的节点
  15. builder.set_entry_point("search")
  16. # 要构建图,首先要定义状态,然后添加节点和边,最后进行编译,会进行基本的检查
  17. graph = builder.compile()
  18. # 执行图
  19. result = graph.invoke({"question": "什么是状态图?"})
  20. print(result["answer"])  # 输出:这是答案
复制代码
状态State

在使用 LangGraph 构建流程图之前,第一件事就是定义图的状态 State。这是整个图运行中用于共享和传递信息的核心机制。
LangGraph 中的 State 是图中所有节点(Node)之间传递数据的模式结构,可以类比为一个共享的上下文字典,它包含输入、输出、中间变量等
定义 State 时,需要包含两个部分:
Schema(模式):指定 State 的字段结构(一般用TypedDict或Pydanic)
  1. # langgraph推荐使用TypedDict
  2. """
  3. 1. TypedDict 是标准库的一部分(来自 typing 模块),零依赖,零性能开销而 Pydantic 会在每一步创建模型实例,会增加运行时负担
  4. 2. LangGraph 中的 State 实质就是一个字典(dict),而 TypedDict 就是“有类型注解的 dict”,与 LangGraph 的执行机制无缝对接,而 Pydantic 是类结构,需要 .dict() 转换,略显多余
  5. """
  6. from typing import TypedDict
  7. class State1(TypedDict):
  8.     user_input: str
  9. # 使用 pydantic 可以进行参数校验和提供默认值
  10. from pydantic import BaseModel
  11. class State2(BaseModel):
  12.     question: str
  13.     result: str = ""
复制代码
多个模式(Multiple Schemas):在大多数情况下,LangGraph 使用一个统一的 State 模式。但你也可以设置“输入模式”和“输出模式”分开
    输入模式:接收用户输入的字段(如question)
  • 输出模式:只保留最终输出的字段(如final_answer)
    1. from typing import TypedDict
    2. from langgraph.graph import StateGraph
    3. # 1. 定义输入、输出、图内部的状态结构
    4. # 输入字段:用户的问题
    5. class InputState(TypedDict):
    6.     question: str
    7. # 中间状态:包括中间结果
    8. class InternalState(TypedDict):
    9.     question: str
    10.     search_result: str
    11.     final_answer: str
    12. # 输出字段:只想返回最终答案
    13. class OutputState(TypedDict):
    14.     final_answer: str
    15. # 2. 定义节点函数(中间节点用中间字段)
    16. def search_node(state: InternalState) -> dict:
    17.     return {"search_result": f"搜索了:{state['question']}"}
    18. def answer_node(state: InternalState) -> dict:
    19.     return {"final_answer": f"根据搜索结果:{state['search_result']},这是答案"}
    20. # 3. 创建 StateGraph,显式指定输入/输出 Schema
    21. builder = StateGraph(state_schema=InternalState,
    22.                      input_schema=InputState,
    23.                      output_schema=OutputState)
    24. # 4. 添加节点
    25. builder.add_node("search", search_node)
    26. builder.add_node("answer", answer_node)
    27. # 5. 配置流程
    28. builder.set_entry_point("search")
    29. builder.add_edge("search", "answer")
    30. # 6. 编译并执行图
    31. app = builder.compile()
    32. result = app.invoke({"question": "什么是LangGraph?"})
    33. print(result)  # {'final_answer': '根据搜索结果:搜索了:什么是LangGraph?,这是答案'}
    复制代码
    -> dict:返回值注解,表示该函数应该返回一个 dict 类型的对象
    Reducer(归并函数):在 LangGraph 中,所有节点返回的都是“局部更新结果”,Reducer 是用于合并多个节点输出更新的机制将每个节点返回的“局部状态更新”统一合并进全局的 State
    在TypeDict 中,我们通过Annotated来给某个字段指定 Reducer 函数
Annotated[类型, Reducer函数] :这里的 add 表示:每次节点返回这个字段时,都把新列表拼接到旧列表后面

    1. from typing import Annotated
    2. from typing_extensions import TypedDict
    3. from operator import add
    4. class State(TypedDict):     
    5.     foo: int     
    6.     bar: Annotated[list[str], add] # 每条消息是 {role, content},会自动追加到列表末尾
    复制代码
    在长任务中,state会随着任务轮次的增加变得臃肿,容易导致解析变慢或者上下文溢出,为了解决这种现象,需要引入reducer机制

在图表中使用消息

在许多情况下,将之前的对话历史记录以消息列表的形式存储在图状态中会很有帮助。为此,我们可以向图状态添加一个键(通道),该键存储Message对象列表,并使用 Reducer 函数对其进行注释。Reducer 函数对于指示图如何Message在每次状态更新(例如,当节点发送更新时)时更新状态中的对象列表至关重要。如果您未指定 Reducer,则每次状态更新都会用最新提供的值覆盖消息列表。如果您只想将消息附加到现有列表中,可以使用operator。
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph
  3. from typing import Annotated
  4. import operator
  5. # 定义状态结构   如果定义的是list[dict],会覆盖之前的数据
  6. class ChatState(TypedDict):
  7.     messages: Annotated[list, operator.add]  # 每条消息是 {role, content},会自动追加到列表末尾
  8. # 节点函数:添加用户问题
  9. def user_input_node(state: ChatState) -> dict:
  10.     user_msg = {"role": "user", "content": "什么是LangGraph?"}
  11.     return {"messages": [user_msg]}
  12. # 节点函数:添加助手回复
  13. def assistant_node(state: ChatState) -> dict:
  14.     reply = {"role": "assistant", "content": "LangGraph 是一个有状态的图编排框架。"}
  15.     return {"messages": [reply]}
  16. # 构建状态图
  17. builder = StateGraph(state_schema=ChatState)
  18. builder.add_node("user_input", user_input_node)
  19. builder.add_node("assistant_reply", assistant_node)
  20. builder.set_entry_point("user_input")
  21. builder.add_edge("user_input", "assistant_reply")
  22. graph = builder.compile()
  23. result = graph.invoke({"messages": []})
  24. print(result["messages"])
复制代码
MessagesState

MessagesState是langgraph中自带的状态模板,它能够自动存储对话历史并自动追加消息,不需要写typedict和reducer
  1. from langgraph.graph import MessagesState
  2. # 和上述代码不同会在State类中自动维护一个messages 字段,不需要显示创建
  3. class State(MessagesState):
  4.     documents: list[str]
复制代码
节点Nodes

节点(Nodes)是图中执行逻辑的基本单位。每个节点表示一个函数步骤、处理阶段或子逻辑流程,多个节点通过边连接成有向图,组成一个完整的有状态计算流程。
LangGraph 中的节点就是你定义的一个函数(或 Runnable 对象),用于接收状态、执行逻辑,并返回更新后的状态
  1. def my_node(state: dict) -> dict:
  2.     # 处理输入状态,并返回更新字段
  3.     return {"new_key": "new_value"}
  4.    
  5. # LangGraph 会自动用 reducer 把这些更新合并进全局状态。
复制代码
START节点

Node START是一个特殊节点,表示将用户输入发送到图的节点。引用此节点的主要目的是确定应首先调用哪些节点。
  1. from langgraph.graph import START
  2. graph.add_edge(START, "node_a")
复制代码
add_edge负责把两个节点连接起来:workflow.add_edge(上一个节点, 下一个节点)
End节点

Node End是一个特殊节点,表示终端节点。当需要指示哪些边在完成后没有操作时,可以引用此节点。
  1. from langgraph.graph import END
  2. graph.add_edge("node_a", END)
复制代码
并行运行节点

并行运行节点指的是同时启动多个不互相依赖的节点,而不是按顺序一个接一个地执行。
langgraph中实现并行只需要将同一个起点的边指向多个不同的节点即可
  1. # 当 start_node 运行结束时,A 和 B 会同时启动
  2. workflow.add_edge("start_node", "node_A")
  3. workflow.add_edge("start_node", "node_B")
复制代码
边Edge

Edge(边)是连接节点的通道,表示图中节点之间的执行跳转关系。你可以把它理解为「节点执行完之后,下一步去哪,是构成 LangGraph 流程图的核心
普通边
  1. graph.add_edge("节点A", "节点B")
复制代码
条件边

条件边会根据state里的内容,判断下一步执行哪一个逻辑函数
  1. def route_condition(state: MyState):
  2.     """条件函数:只负责路由决策"""
  3.     if state["type"] == "a":
  4.         return "a"
  5.     elif state["type"] == "b":
  6.         return "b"
  7.     else:
  8.         return "default"
  9. def node_a(state):
  10.     return {"result": "走了 A 分支"}
  11. def node_b(state):
  12.     return {"result": "走了 B 分支"}
  13. def node_default(state):
  14.     return {"result": "走了默认分支"}
  15. graph.add_conditional_edges("judge_node", route_condition, {
  16.     "a": "a",
  17.     "b": "b",
  18.     "default": "default"
  19. })
复制代码
条件边的格式
workflow.add_conditional_edges(
    "起点节点",
    routing_function,      # 判断逻辑函数
    {"path_a": "节点A", "path_b": "节点B"} # 路由映射表
)
条件入口点

根据初始输入的数据,动态决定首先运行哪个节点
  1. from langgraph.graph import StateGraph, START, END
  2. # 1. 定义路由函数
  3. def route_start(state: State):
  4.     if state["question"].startswith("查资料"):
  5.         return "search_node"
  6.     else:
  7.         return "chat_node"
  8. # 2. 构建图
  9. builder = StateGraph(State)
  10. builder.add_node("search_node", search_func)
  11. builder.add_node("chat_node", chat_func)
  12. # 3. 设置条件入口点:从 START 开始进行条件跳转
  13. builder.add_conditional_edges(
  14.     START,           # 起点是内置的 START
  15.     route_start,     # 判断函数
  16.     {
  17.         "search_node": "search_node",
  18.         "chat_node": "chat_node"
  19.     }
  20. )
复制代码
发送Send

默认情况下,Nodes和Edges是提前定义的,并在相同的共享状态下运行。但是,在某些情况下,确切的边无法提前知道,并且可能希望同时存在State的不同版本。
也就是说,普通的边只能沿着图预设的路线走;而send可以强制跳到任意节点并传输数据,支持运行时动态地分发任务
    条件路由:根据某些条件将消息发送到不同的节点(可以直接用条件边实现,没必要用send)
    并行处理:同时向多个节点发送消息 (核心)
    动态工作流:根据运行时状态决定消息的发送目标
Map-Reduce

Map-Reduce就是send的工作原理,它是一种经典的并行计算模式,特别适合处理大规模数据
Map-Reduce 将复杂的数据处理任务分解为两个阶段:
    Map(分发):Send将主状态中的一个列表拆开,并行启动多个目标节点。每个节点只拿到属于它的那部分数据。
    Process(处理):这些节点同时运行,互不干扰。
    Reduce(聚合):当所有并行的节点都跑完后,将所有小任务的结果合并成最终结果
标准结构是  Send(目标节点名称, 要传递的状态数据)
Send("node_name", {"key": value})
  1. """
  2. LangGraph Map-Reduce 简单案例:数字求和
  3. 把一堆数字分给多个worker算平方,然后把结果加起来
  4. """
  5. from typing import Annotated
  6. import operator
  7. from langgraph.graph import StateGraph, START, END
  8. from langgraph.constants import Send
  9. from typing import TypedDict, List
  10. # 状态定义
  11. class State(TypedDict):
  12.     numbers: List[int]        # 输入的数字
  13.     results: Annotated[list[int], operator.add]        # worker的结果
  14.     final_sum: int           # 最终求和
  15. # 1. Map阶段:分发数字
  16. def split_numbers(state: State):
  17.     """把数字分发给不同的worker"""
  18.     numbers = state["numbers"]
  19.     print(f"分发数字: {numbers}")
  20.     # 每个数字发给一个worker
  21.     return [Send("worker", {"number": num}) for num in numbers]
  22. # 2. Worker阶段:计算平方
  23. def calculate_square(state: State):
  24.     """每个worker计算一个数字的平方"""
  25.     number = state["number"]
  26.     square = number * number
  27.     print(f"Worker: {number}² = {square}")
  28.     return {"results": [square]}
  29. # 3. Reduce阶段:求和
  30. def sum_results(state: State):
  31.     """把所有结果加起来"""
  32.     results = state.get("results", [])
  33.     total = sum(results)
  34.     print(f"求和: {results} = {total}")
  35.     return {"final_sum": total}
  36. # 构建图
  37. def create_simple_graph():
  38.     graph = StateGraph(State)
  39.     # 添加节点
  40.     graph.add_node("splitter", lambda s: s)  # 分发器
  41.     graph.add_node("worker", calculate_square)  # 工作节点
  42.     graph.add_node("summer", sum_results)      # 求和器
  43.     # 连接节点
  44.     graph.add_edge(START, "splitter")
  45.     graph.add_conditional_edges("splitter", split_numbers, ["worker"])  # Map阶段
  46.     graph.add_edge("worker", "summer")  # Worker完成后求和
  47.     graph.add_edge("summer", END)
复制代码
注意这里,graph.add_node("worker",calculate_square)
第一个字符串参数是节点中的标识符
第二个参数是具体要执行的函数
这里解释两个点
    分发器:graph.add_node("splitter", lambda s: s);这种节点经常用作数据分流(路由)的起点,lambda是python中的一个匿名函数,lambda s: s说明这个函数传入什么就输出什么;这个看起来什么也不干的“splitter”节点就是负责接收数据然后分发给每个worker节点。
    Map分发过程:graph.add_conditional_edges("splitter", split_numbers, ["worker"]),这里Send已经指定了要去的节点因此不需要路由映射表split_numbers函数返回了n个Send,LangGraph 看到这些 Send,就知道要并行启动n个worker节点
命令Command

Command 是 LangGraph 中用于控制图执行流程、更新图状态,并支持人机交互、工具调用的标准化对象。
核心作用:

第一,更新图的运行状态;
第二,控制图的执行流向(指定下一个或多个执行节点);
第三,衔接中断恢复、工具调用、人机交互等场景
command参数

update:应用状态更新(类似于从节点返回更新)。
goto:导航到特定节点(类似于条件边,以后无需再stategraph里连线)。
graph:在从子图导航时定位到父图。
resume:在中断后提供一个值以继续执行。
在节点函数中返回command时,必须添加返回类型注释,其中包含节点路由到的节点名称列表,例如command[Literal ["my_other_node"]]。这对于图形渲染是必需的,它告诉 LangGraph 当前节点可以导航到my_other_node。
  1. from typing import TypedDict
  2. from langgraph.graph import StateGraph, END
  3. from langgraph.types import Command, Send
  4. from typing import Literal
  5. class MyState(TypedDict):
  6.     type: str
  7.     text: str
  8.     result: str
  9. def judge_node(state: MyState) -> Command[Literal["a", "b", "default"]]:
  10.     """条件函数:使用Command进行路由和状态更新"""
  11.     if state["type"] == "a":
  12.         return Command(update={"text": "走了 A 分支"}, goto="a")
  13.     elif state["type"] == "b":
  14.         return Command(update={"text": "走了 B 分支"}, goto="b")
  15.     else:
  16.         return Command(update={"text": "走了默认分支"}, goto="default")
  17. def node_a(state):
  18.     return {"result": f"A节点处理: {state['text']}"}
  19. def node_b(state):
  20.     return {"result": f"B节点处理: {state['text']}"}
  21. def node_default(state):
  22.     return {"result": f"默认节点处理: {state['text']}"}
  23. # 构建图
  24. graph = StateGraph(state_schema=MyState)
  25. graph.add_node("judge_node", judge_node)
  26. graph.add_node("a", node_a)
  27. graph.add_node("b", node_b)
  28. graph.add_node("default", node_default)
  29. graph.set_entry_point("judge_node")
  30. # 添加结束边
  31. graph.add_edge("a", END)
  32. graph.add_edge("b", END)
  33. graph.add_edge("default", END)
  34. app = graph.compile()
  35. # 测试
  36. print("测试 A:", app.invoke({"type": "a", "text": "", "result": ""}))
  37. print("测试 B:", app.invoke({"type": "b", "text": "", "result": ""}))
  38. print("测试其他:", app.invoke({"type": "default", "text": "", "result": ""}))
复制代码
在这里,我们使用了Command对象过后就不需要写builder.add_nodebuilder.add_conditional_edges函数了
什么时候应该使用Command而不是条件边

如果你想在节点内部既要改状态state,又要实现跳转,那么就必须使用Command,在条件边中跳转状态修改分散在两个函数里面,而command可以把这两个逻辑聚在一起。

原文地址:https://blog.csdn.net/jiaranran8/article/details/160506504
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )