作者:CSDN博客
LangGraph 概述
LangGraph 是一个低级编排框架和运行时环境,用于构建、管理和部署长期运行的有状
态智能体(agents)。核心理念是将Agent 工作流建模为图(Graph),其中:
**节点(Nodes):**代表计算单元,可以是LLM 调用、工具执行或任何自定义逻辑**边(Edges):**定义节点之间的转换逻辑,决定执行流程**状态(State):**在整个图执行过程中共享和传递的数据
LangGraph提供了构建生产级智能体应用的核心能力:
持久化执行:构建能够从故障中恢复并长时间运行的智能体人机协作:在任何时刻检查和修改智能体状态记忆管理:支持短期工作记忆和跨会话的长期记忆流式处理:专为流式工作流设计生产级部署:为有状态、长期运行的工作流提供可扩展的基础设施
与LangChain区别
| 特性 | LangGraph | LangChain | | 抽象级别 | 低级,提供细粒度控制 | 高级,提供开箱即用的链 | | 状态管理 | 内置状态机和检查点 | 需要自行管理状态 | | 执行模型 | 基于图的并行执行 | 线性链式执行 | | 持久化 | 原生支持 | 需要额外实现 | | 适用场景 | 复杂、有状态的工作流 | 简单的链式调用 | LangGraph架构设计
**1)****Pregel **架构
LangGraph 的运行时基于 Google 的 Pregel 算法,这是一种用于大规模并行图计算的模型。执行过程分为三个阶段:
(1)Plan(规划):确定本轮要执行的节点
(2)Execution(执行):并行执行所有选中的节点
(3)Update(更新):将节点输出更新到通道(channels)
每个执行轮次称为一个”超步(super-step)”,系统会持续迭代直到没有节点需要执行。
**2)**Actors (PregelNode)
订阅通道、读取和写入数据的节点,实现LangChain的Runnable接口。
3)Channels(通道)
用于actors之间通信,包括:
LastValue:存储最后发送的值Topic:可配置的发布-订阅主题BinaryOperatorAggregate:用于聚合操作
Graph API
从核心来看,LangGraph 将智能体工作流建模为图。你可以使用三个关键组件来定义智能体的行为:
(1)State:一种共享数据结构,用于表示应用程序的当前快照。它可以是任何数据类型,但通常使用共享状态模式来定义。类似于store。
(2)Nodes:对智能体逻辑进行编码的函数。它们接收当前状态作为输入,执行一些计算或副作用,并返回更新后的状态。
(3)Edges:根据当前状态确定下一个要执行的Node的函数。它们可以是条件分支或固定转换。
**节点执行工作,边指示下一步该做什么。**通过组合Nodes和Edges,可以创建复杂的、循环的工作流,这些工作流会随着时间推移不断演化状态。不过,真正的强大之处在于LangGraph对该状态的管理方式。Nodes和Edges只不过是函数——它们可以包含大语言模型,或者仅仅是一些传统代码。
StateGraph类是要使用的主要图类。它由用户定义的State对象进行参数化。要构建图,首先需要定义状态,然后添加节点和边,接着进行编译。
State
定义图时,首先要做的是定义图的State。State由图的schema以及reducer函数组成,其中reducer函数指定了如何对状态进行更新。State的schema将作为图中所有Nodes和Edges的输入模式,它可以是TypedDict或Pydantic模型。所有Nodes都会发出对State的更新,然后使用指定的reducer函数应用这些更新。
Schema
LangGraph 中 state_schema、input_schema 和 output_schema 这三个概念是用于管理图状态的不同方面:
**1)**state_schema
这是图的完整内部状态,包含了所有节点可能读写的字段,必须指定,不能为空。
特点:
是图的"全局状态空间"所有节点都可以访问和写入这个 schema 中的任何字段
**2)**input_schema
定义图接受什么输入,是 state_schema 的子集
特点:
可选参数,如果不指定,默认等于 state_schema限制图的输入接口,只能传入这些字段是 state_schema 的子集或相等
**3)**output_schema
定义图返回什么输出,是 state_schema 的子集
特点:
可选参数,如果不指定,默认等于 state_schema限制图的输出接口,只返回这些字段是 state_schema 的子集或相等
下图中展示了一个输入输出状态分离的图结构示例,其中OverallState为该图的主状态,其中包含了InputState和OutputState作为子输入和输出状态,用于被读取和被写入。OverallState继承了InputState和OutputState中所有的状态信息。
- from langgraph.graph import START,StateGraph,END
- from typing_extensions import TypedDict
- classInputState(TypedDict):
- question:strclassOutputState(TypedDict):
- answer:strclassOverallState(InputState,OutputState):passdefanswer_node(state:InputState):print(f"执行 answer_node 节点:")print(f" 输入: {state}")
- answer='再见'if'bye'in state['question'].lower()else'你好'
- res={'answer':answer,'question':state['question']}print(f" 输出: {res}")return res
- defdemo_input_output_schema():"""演示输入输出模式"""print("=== 演示输入输出模式 ===")# 使用指定的输入和输出模式构建图
- builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState)
- builder.add_node("answer_node", answer_node)# 添加答案节点
- builder.add_edge(START,"answer_node")# 定义起始边
- builder.add_edge("answer_node", END)# 定义结束边
- graph = builder.compile()# 编译图# 使用输入调用图并打印结果
- result = graph.invoke({"question":"你好啊123"})print(f"图调用结果: {result}")defmain():"""主函数"""# 演示输入输出模式
- demo_input_output_schema()print("=== 演示完成 ===")if __name__ =="__main__":
- main()# 图调用结果: {'answer': '你好'}# 如果把input_schema=InputState, output_schema=OutputState删掉输出结果为:# 图调用结果: {'question': '你好啊123', 'answer': '你好'}
复制代码 下图中展示了一个私有状态传递的图结构,OverallState为图中的主状态,其中node_1接收OverallState,将输出写入到私有状态Node1Output中的private_data字段中。但此时图中的主状态并没有该字段,于是将private_data字段与对应的数据构成的kv对暂存到RAM中。在进入下一个节点node_2的时候,接收参数为Node2Input中的同名字段private_data。图的管理行为会提取出该字段,并在暂存区寻找有无对应的字段,发现存在,对private_data进行消费,并将node_1赋的值传递到node_2中,在进行了消费之后销毁内存中的该字段,实现私有字段的传递。
- from langgraph.graph import START,StateGraph,END
- from typing_extensions import TypedDict
- # 定义整体状态(这是在节点间共享的公共状态)classOverallStatePrivate(TypedDict):
- a:str# node_1 的输出包含不属于整体状态的私有数据classNode1Output(TypedDict):
- private_data:str# 私有数据仅在 node_1 和 node_2 之间共享defnode_1(state: OverallStatePrivate)-> Node1Output:
- output ={"private_data":"由 node_1 设置"}print(f"进入 node_1 节点:")print(f" 输入: {state}")print(f" 返回: {output}")return output
- classNode2Input(TypedDict):
- private_data:strdefnode_2(state: Node2Input)-> OverallStatePrivate:
- output ={"a":"由 node_2 设置"}print(f"进入 node_2 节点:")print(f" 输入: {state}")print(f" 返回: {output}")return output
- # 节点3只能访问整体状态(无法访问node_1的私有数据)defnode_3(state: OverallStatePrivate)-> OverallStatePrivate:
- output ={"a":"由 node_3 设置"}print(f"进入 node_3 节点:")print(f" 输入: {state}")print(f" 返回: {output}")return output
- defdemo_private_state():"""演示私有状态传递"""print("=== 演示私有状态传递 ===")# 连接节点序列# node_2 接受来自 node_1 的私有数据,而# node_3 看不到来自 node_1 的私有数据
- builder = StateGraph(OverallStatePrivate).add_sequence([node_1, node_2, node_3])
- builder.add_edge(START,"node_1")
- graph = builder.compile()# 使用初始状态调用图
- response = graph.invoke({"a":"在开始时设置",})print()print(f"图调用的输出: {response}")print()defmain():"""主函数"""# 演示私有状态传递
- demo_private_state()print("=== 演示完成 ===")if __name__ =="__main__":
- main()
复制代码 Reducer
reducer是理解节点更新如何应用于State的关键,State中的每个键都有其独立的reducer函数。每个node的返回值中的每个key与全局state_schema中对应的key进行合并更新,具体更新逻辑取决于每个key指定的reducer函数。
Reducer常用函数有以下几种:
默认行为:未指定Reducer时使用覆盖更新add_messages:用于消息列表追加operator.add:用于列表追加或数值累加operator.mul:用于数值相乘自定义Reducer:支持用户自定义合并逻辑
默认覆盖
如果未明确指定reducer函数,则默认对该键的更新是覆盖行为。
- from typing import List
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- # 1. 默认Reducer(覆盖更新)classDefaultReducerState(TypedDict):
- foo:int
- bar: List[str]defnode_default_1(state: DefaultReducerState)->dict:return{"foo":2}defnode_default_2(state: DefaultReducerState)->dict:return{"bar":["bye"]}defrun_demo():
- builder = StateGraph(DefaultReducerState)
- builder.add_node("node1", node_default_1)
- builder.add_node("node2", node_default_2)
- builder.add_edge(START,"node1")
- builder.add_edge("node1","node2")
- builder.add_edge("node2", END)
- graph = builder.compile()
-
- result = graph.invoke({"foo":1,"bar":["hi"]})print(f"初始状态: {{'foo': 1, 'bar': ['hi']}}")print(f"执行结果: {result}\n")if __name__ =="__main__":
- run_demo()
复制代码 add_messages
专门用于处理消息列表,当使用 add_messages时,消息会被转换为 LangChain 的消息对象(如 HumanMessage、AIMessage 等)。
- from typing import Annotated, List
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.graph.message import add_messages
- # 2. add_messages Reducer(消息列表专用)classAddMessagesState(TypedDict):
- messages: Annotated[List, add_messages]defchat_node_1(state: AddMessagesState)->dict:return{"messages":[("assistant","Hello from node 1")]}defchat_node_2(state: AddMessagesState)->dict:return{"messages":[("assistant","Hello from node 2")]}defrun_demo():
- builder = StateGraph(AddMessagesState)
- builder.add_node("chat1", chat_node_1)
- builder.add_node("chat2", chat_node_2)
- builder.add_edge(START,"chat1")
- builder.add_edge(START,"chat2")# 并行执行
- builder.add_edge("chat1", END)
- builder.add_edge("chat2", END)
- graph = builder.compile()
-
- result = graph.invoke({"messages":[("user","Hi there!")]})print(f"初始状态: {{'messages': [('user', 'Hi there!')]}}")print(f"执行结果: {result}\n")if __name__ =="__main__":
- run_demo()
复制代码 operator.add
将元素追加到现有元素中,支持列表、字符串、数值类型的追加
- import operator
- from typing import Annotated, List
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- classListAddState(TypedDict):
- data: Annotated[List[int], operator.add]# text: Annotated[str, operator.add] 字符串拼接只需修改数据类型即可# count: Annotated[int, operator.add] 数值累加同理defproducer_1(state: ListAddState)->dict:return{"data":[1,2]}defproducer_2(state: ListAddState)->dict:return{"data":[3,4]}defrun_demo():
- builder = StateGraph(ListAddState)
- builder.add_node("producer1", producer_1)
- builder.add_node("producer2", producer_2)
- builder.add_edge(START,"producer1")
- builder.add_edge(START,"producer2")# 并行执行
- builder.add_edge("producer1", END)
- builder.add_edge("producer2", END)
- graph = builder.compile()
-
- result = graph.invoke({"data":[0]})print(f"初始状态: {{'data': [0]}}")print(f"执行结果: {result}\n")if __name__ =="__main__":
- run_demo()
复制代码 operator.mul
用于数值字段的相乘操作,官方设计上存在缺陷。- import operator
- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- classMultiplyState(TypedDict):
- factor: Annotated[float, operator.mul]defmultiplier(state: MultiplyState)->dict:return{"factor":2.0}defrun_demo():"""
- !!!operator.mul实际使用上,官方设计存在bug:!!!
- 在执行初始阶段(我们定义的第一个node前),会默认调用一次reducer(后面自定义reducer案例中进行了打印验证),用默认值与invoke传递的值进行计算:
- 此案例中,invoke中传递了一个默认值5.0,由于会默认调用一次reducer,执行的计算是: 0.0(float默认值) * 5.0(invoke传递的初始值) = 0.0
- 导致后续乘法结果一直都是0
- 解决方案: 使用自定义reducer
- """
- builder = StateGraph(MultiplyState)
- builder.add_node("multiplier", multiplier)
- builder.add_edge(START,"multiplier")
- builder.add_edge("multiplier", END)
- graph = builder.compile()
- result = graph.invoke({"factor":5.0})print(f"初始状态: {{'factor': 5.0}}")print(f"执行结果: {result}\n")if __name__ =="__main__":
- run_demo()
复制代码 自定义Reducer函数
合并两个字典
- from typing import Annotated, Dict, Any
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- defcustom_reducer(current_value: Dict[str, Any], new_value: Dict[str, Any])-> Dict[str, Any]:"""合并两个字典,新值会覆盖旧值,但保留旧值中不存在的键"""
- result = current_value.copy()
- result.update(new_value)return result
- classCustomReducerState(TypedDict):
- metadata: Annotated[Dict[str, Any], custom_reducer]defupdate_metadata(state: CustomReducerState)->dict:return{"metadata":{"timestamp":"2025-01-01","version":"1.0"}}defrun_demo():
- builder = StateGraph(CustomReducerState)
- builder.add_node("update_metadata", update_metadata)
- builder.add_edge(START,"update_metadata")
- builder.add_edge("update_metadata", END)
- graph = builder.compile()
-
- result = graph.invoke({"metadata":{"user_id":"123","session":"abc"}})print(f"初始状态: {{'metadata': {{'user_id': '123', 'session': 'abc'}}}}")print(f"执行结果: {result}\n")if __name__ =="__main__":
- run_demo()
复制代码 自定义reducer实现数值乘法- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- # 使用全局变量来跟踪是否是第一次调用(初始化阶段)
- _is_initial_call =Truedefmy_mul_reducer(current_value:float, new_value:float)->float:global _is_initial_call
- print(f"Reducer被调用: current_value={current_value}, new_value={new_value}, is_initial_call={_is_initial_call}")# 如果是初始化调用,直接返回new_value,避免默认值0的影响if _is_initial_call:
- _is_initial_call =False# 重置标志return new_value
- else:# 正常的乘法操作,包括乘以0的情况return current_value * new_value
- classMultiplyState(TypedDict):
- factor: Annotated[float, my_mul_reducer]defmultiplier_by_two(state: MultiplyState)->dict:"""将factor乘以2"""return{"factor":2.0}defmultiplier_by_zero(state: MultiplyState)->dict:"""将factor乘以0"""return{"factor":0.0}defrun_demo():global _is_initial_call
- _is_initial_call =True# 重置初始化标志
- builder3 = StateGraph(MultiplyState)
- builder3.add_node("multiplier_by_two_1", multiplier_by_two)
- builder3.add_node("multiplier_by_zero", multiplier_by_zero)
- builder3.add_node("multiplier_by_two_2", multiplier_by_two)
- builder3.add_edge(START,"multiplier_by_two_1")
- builder3.add_edge("multiplier_by_two_1","multiplier_by_zero")
- builder3.add_edge("multiplier_by_zero","multiplier_by_two_2")
- builder3.add_edge("multiplier_by_two_2", END)
- graph3 = builder3.compile()
- result3 = graph3.invoke({"factor":3.0})print(f"初始状态: {{'factor': 3.0}}")print(f"执行结果: {result3}")print(f"预期过程: 3.0 -> 6.0 -> 0.0 -> 0.0")print(f"预期结果: 0.0\n")if __name__ =="__main__":
- run_demo()
复制代码 Nodes
在LangGraph中,节点是Python函数(可以是同步的,也可以是异步的),它们接受以下参数:
state:图的状态config:一个RunnableConfig对象,包含诸如thread_id之类的配置信息以及诸如tags之类的跟踪信息runtime:一个Runtime对象,包含运行时context以及其他信息,如store和stream_writer
定义好node函数后,使用add_node方法将这些节点添加到图中。如果在向图中添加节点时未指定名称,系统会为其分配一个与函数名相同的默认名称。
START Node
START节点是一个特殊节点,它代表着将用户输入发送到图中的节点。引用此节点的主要目的是确定应首先调用哪些节点。
END Node
END节点是一个特殊节点,代表终端节点。当你想表示哪些边在完成后没有动作时,会引用这个节点。
Node Caching
LangGraph支持基于节点输入对任务/节点进行缓存。使用缓存的方法如下:
编译图(或指定入口点)时设置cache=InMemoryCache()指定缓存。
为节点指定缓存策略。每个缓存策略支持:
key_func用于根据节点的输入生成缓存键,默认情况下是使用pickle对输入进行hash运算的结果。ttl,即缓存的生存时间(以秒为单位)。如果未指定,缓存将永不过期。
- import time
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph
- from langgraph.cache.memory import InMemoryCache
- from langgraph.types import CachePolicy
- # 定义状态classState(TypedDict):
- x:int
- result:int# 创建图
- builder = StateGraph(State)# 定义节点defexpensive_node(state: State)->dict[str,int]:# expensive computation
- time.sleep(2)return{"result": state["x"]*2}# 添加节点,并设置缓存策略
- builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))# 设置入口和出口
- builder.set_entry_point("expensive_node")
- builder.set_finish_point("expensive_node")# 编译图
- graph = builder.compile(cache=InMemoryCache())# 执行图print(graph.invoke({"x":5}, stream_mode='updates'))# [{'expensive_node': {'result': 10}}]# 第二次运行利用缓存并快速返回print(graph.invoke({"x":5}, stream_mode='updates'))# [{'expensive_node': {'result': 10}, '__metadata__': {'cached': True}}]
复制代码 添加重试策略
在很多使用场景中,你可能希望节点拥有自定义的重试策略,例如在调用API、查询数据库或调用大语言模型(LLM)等情况下。
为节点添加重试策略,需要在add_node中设置retry_policy参数。retry_policy参数接受一个RetryPolicy命名元组对象。默认情况下,retry_on参数使用default_retry_on函数,该函数会在遇到任何异常时重试,除了以下情况:
ValueError(值错误)TypeError(类型错误)ArithmeticError(算术错误)ImportError(导入错误)LookupError(查找错误)NameError(名称错误)SyntaxError(语法错误)RuntimeError(运行时错误)ReferenceError(引用错误)StopIteration(停止迭代)StopAsyncIteration(停止迭代)OSError(操作系统错误)
- from typing import Dict, Any
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import RetryPolicy
- # 定义状态classState(TypedDict):
- result:str# 模拟不稳定的API调用,使用全局变量跟踪尝试次数
- attempt_counter =0defunstable_api_call(state: State)-> Dict[str, Any]:global attempt_counter
- attempt_counter +=1print(f"尝试调用API,这是第 {attempt_counter} 次尝试")# 模拟前几次尝试失败,最后一次成功if attempt_counter <3:raise Exception(f"模拟API调用失败 (尝试 {attempt_counter})")else:# 第三次尝试成功return{"result":f"API调用成功,经过 {attempt_counter} 次尝试"}defrun_demo():# 重置全局计数器global attempt_counter
- attempt_counter =0
- builder1 = StateGraph(State)
- builder1.add_node("unstable_call",
- unstable_api_call,
- retry_policy=RetryPolicy(max_attempts=5)# 允许最多5次尝试)
- builder1.add_edge(START,"unstable_call")
- builder1.add_edge("unstable_call", END)
- graph1 = builder1.compile()print("测试默认重试策略:")try:
- result = graph1.invoke({"result":""})print(f"最终结果: {result}\n")except Exception as e:print(f"最终失败: {type(e).__name__}: {e}\n")if __name__ =="__main__":
- run_demo()
复制代码 延迟节点执行
延迟节点执行就是将某个节点的执行推迟到所有其他待处理任务完成后,这在分支长度不同的情况下尤其适用。通过设置defer=True来实现节点延迟执行。
- import operator
- from typing import Annotated, Any
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- classState(TypedDict):
- aggregate: Annotated[list, operator.add]defa(state: State):print(f'Adding "A" to {state["aggregate"]}')return{"aggregate":["A"]}defb(state: State):print(f'Adding "B" to {state["aggregate"]}')return{"aggregate":["B"]}defb_2(state: State):print(f'Adding "B_2" to {state["aggregate"]}')return{"aggregate":["B_2"]}defc(state: State):print(f'Adding "C" to {state["aggregate"]}')return{"aggregate":["C"]}defd(state: State):print(f'Adding "D" to {state["aggregate"]}')return{"aggregate":["D"]}# 创建图
- builder = StateGraph(State)# 添加节点
- builder.add_node("a", a)
- builder.add_node("b", b)
- builder.add_node("b_2", b_2)
- builder.add_node("c", c)
- builder.add_node("d", d, defer=True)# 设置defer=True延迟执行# 添加边
- builder.add_edge(START,"a")
- builder.add_edge("a","b")
- builder.add_edge("a","c")
- builder.add_edge("b","b_2")
- builder.add_edge("b_2","d")
- builder.add_edge("c","d")
- builder.add_edge("d", END)# 编译图
- graph = builder.compile()# 执行图print("=== 开始执行工作流 ===")
- result = graph.invoke({})print("=== 执行结果 ===")print(result)
复制代码 Edges
边定义了逻辑的路由方式以及图如何决定停止。这是智能体工作方式以及不同节点之间通信方式的重要组成部分。边有几种关键类型:
Normal Edges: 普通边。直接从一个节点连接到下一个节点。Conditional Edges: 条件边。调用函数以确定接下来要前往哪个(哪些)节点。Entry Point: 入口点。用户输入到达时首先调用哪个节点。Conditional Entry Point: 条件入口点。调用一个函数来确定当用户输入到达时,首先调用哪个(些)节点。
一个节点可以有多个出边。如果一个节点有多个出边,那么所有这些目标节点都将作为下一个超级步骤的一部分并行执行。
** Normal Edges**
Conditional Edges
- from typing import Literal
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- # 定义状态classGraphState(TypedDict):
- value:int
- step:str# 定义节点函数defnode_a(state: GraphState)->dict:print("执行节点A")return{"value": state["value"]+1,"step":"A执行完毕"}defnode_b(state: GraphState)->dict:print("执行节点B")return{"value": state["value"]*2,"step":"B执行完毕"}defnode_c(state: GraphState)->dict:print("执行节点C")return{"value": state["value"]-1,"step":"C执行完毕"}# 条件边的路由函数defroute_condition(state: GraphState)-> Literal["node_b","node_c"]:"""根据value值决定路由到哪个节点"""if state["value"]%2==0:return"node_b_alias"# 偶数路由到节点Belse:return"node_c_alias"# 奇数路由到节点Cdefmain():"""演示条件边"""# 创建图
- builder = StateGraph(GraphState)# 添加节点
- builder.add_node("node_a", node_a)
- builder.add_node("node_b", node_b)
- builder.add_node("node_c", node_c)# 添加边
- builder.add_edge(START,"node_a")# 入口点# 添加条件边
- builder.add_conditional_edges("node_a",# 源节点
- route_condition,# 路由函数{# 路由映射"node_b_alias":"node_b","node_c_alias":"node_c"})# 从B和C到结束
- builder.add_edge("node_b", END)
- builder.add_edge("node_c", END)# 编译图
- graph = builder.compile()# 执行图 - 偶数情况print("输入值为偶数:")
- result = graph.invoke({"value":2})print(f"执行结果: {result}")# 执行图 - 奇数情况print("\n输入值为奇数:")
- result = graph.invoke({"value":1})print(f"执行结果: {result}\n")if __name__ =="__main__":
- main()
复制代码 Entry Point
Conditional Entry Point(入口的条件边)
- from typing import Literal
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- # 定义状态classGraphState(TypedDict):
- value:int
- step:str# 定义节点函数defnode_a(state: GraphState)->dict:print("执行节点A")return{"value": state["value"]+1,"step":"A执行完毕"}defnode_b(state: GraphState)->dict:print("执行节点B")return{"value": state["value"]*2,"step":"B执行完毕"}defnode_d(state: GraphState)->dict:print("执行节点D")return{"value": state["value"]+10,"step":"D执行完毕"}defentry_condition(state: GraphState)-> Literal["node_a","node_d"]:"""根据输入值决定从哪个节点开始"""if state.get("value",0)>5:return"node_d"# 大于5从节点D开始else:return"node_a"# 否则从节点A开始defmain():# 创建图
- builder = StateGraph(GraphState)# 添加节点
- builder.add_node("node_a", node_a)
- builder.add_node("node_d", node_d)
- builder.add_node("node_b", node_b)# 添加条件入口点
- builder.add_conditional_edges(
- START,# 起始点
- entry_condition,# 路由函数{# 路由映射"node_a":"node_a","node_d":"node_d"})# 添加普通边
- builder.add_edge("node_a","node_b")
- builder.add_edge("node_d","node_b")
- builder.add_edge("node_b", END)# 编译图
- graph = builder.compile()# 执行图 - 小于等于5的情况print("输入值小于等于5:")
- result = graph.invoke({"value":3})print(f"执行结果: {result}")# 执行图 - 大于5的情况print("\n输入值大于5:")
- result = graph.invoke({"value":10})print(f"执行结果: {result}\n")if __name__ =="__main__":
- main()
复制代码 创建和控制循环
在创建带有循环的图时,需要一种终止执行的机制。最常见的做法是添加一条条件边,当达到某个终止条件时,该边会路由到END节点。
递归限制设定了图在抛出错误之前允许执行的超级步骤数量,默认值25,在graph.invoke的config参数中指定。在某些应用中,我们无法保证会达到给定的终止条件。在这种情况下,我们可以设置图的递归限制。这将在经过指定数量的超级步骤后引发GraphRecursionError。然后我们可以捕获并处理这个异常。
- from typing import Annotated, Dict, Literal
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.errors import GraphRecursionError
- classLoopState(TypedDict):
- count:int
- result:str
- max_count:intdefnode_a(state: LoopState)->dict:print(f"执行节点a,当前计数: {state['count']}")return{'count': state['count']+1,'result':f"已处理{state['count']}次"}defnode_b(state: LoopState)->dict:print(f"执行节点b,当前计数: {state['count']}")return{'result':f"已处理{state['count']}次 - 辅助处理"}defroute(state: LoopState)-> Literal["b", END]:# 终止条件:当计数达到最大值时终止if state['count']>= state['max_count']:print(f"满足终止条件,计数 {state['count']} >= {state['max_count']},返回END")return END
- else:print(f"未满足终止条件,计数 {state['count']} < {state['max_count']},返回b")return"b"# 创建图
- builder = StateGraph(LoopState)# 添加节点
- builder.add_node("a", node_a)
- builder.add_node("b", node_b)# 添加边
- builder.add_edge(START,"a")
- builder.add_conditional_edges("a", route)
- builder.add_edge("b","a")# 编译图
- graph = builder.compile()# 执行图try:
- result = graph.invoke(input={'count':0,'result':'','max_count':3}, config={'recursion_limit':6# 设置递归限制})print("=== 执行结果 ===")print(result)except GraphRecursionError as e:print(f"递归错误: {e}")
复制代码 Send
在传统的图结构中,节点和边都是预先定义好的,但在某些场景下,我们需要动态地根据运行时状态来决定执行哪些节点。Map-Reduce 模式就是这样一个典型场景:
一个节点生成一个动态数量的对象列表。另一个节点需要对列表中的每个对象进行处理。最终将所有处理结果合并。
自动批量分发、自动并行、自动合并结果 。为了支持这种设计模式,LangGraph 支持从条件边返回 Send 对象。Send 接受两个参数:
第一个是节点的名称。第二个是要传递给该节点的状态。
- from typing import Annotated, List, Sequence
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import Send
- # 定义状态classOverallState(TypedDict):
- subjects: List[str]
- jokes: Annotated[List[str],lambda x, y: x + y]# 使用列表合并的方式# 第一个节点:生成需要处理的主题列表defgenerate_subjects(state: OverallState)->dict:"""生成需要处理的主题列表"""print("执行节点: generate_subjects")
- subjects =["猫","狗","程序员"]print(f"生成主题列表: {subjects}")return{"subjects": subjects}# Map节点:为每个主题生成笑话defmake_joke(state: OverallState)->dict:"""为单个主题生成笑话"""
- subject = state.get("subject","未知")print(f"执行节点: make_joke,处理主题: {subject}")# 根据主题生成相应笑话
- jokes_map ={"猫":"为什么猫不喜欢在线购物?因为它们更喜欢实体店!","狗":"为什么狗不喜欢计算机?因为它们害怕被鼠标咬!","程序员":"为什么程序员喜欢洗衣服?因为他们在寻找bugs!","未知":"这是一个关于未知主题的神秘笑话。"}
-
- joke = jokes_map.get(subject,f"这是一个关于{subject}的即兴笑话。")print(f"生成笑话: {joke}")return{"jokes":[joke]}# 条件边函数:根据主题列表生成Send对象列表defmap_subjects_to_jokes(state: OverallState)-> Sequence[Send]:"""将主题列表映射到joke生成任务"""print("执行条件边函数: map_subjects_to_jokes")
- subjects = state["subjects"]print(f"映射主题到joke任务: {subjects}")# 为每个主题创建一个Send对象,指向make_joke节点# 每个Send对象包含节点名称和传递给该节点的状态
- send_list =[Send("make_joke",{"subject": subject})for subject in subjects]print(f"生成Send对象列表: {send_list}")return send_list
- defmain():"""演示Map-Reduce模式"""print("=== Map-Reduce 模式演示 ===\n")# 创建图
- builder = StateGraph(OverallState)# 添加节点
- builder.add_node("generate_subjects", generate_subjects)
- builder.add_node("make_joke", make_joke)# 添加边
- builder.add_edge(START,"generate_subjects")# 添加条件边,使用Send对象实现map-reduce
- builder.add_conditional_edges("generate_subjects",# 源节点
- map_subjects_to_jokes # 路由函数,返回Send对象列表)# 从make_joke到结束
- builder.add_edge("make_joke", END)# 编译图
- graph = builder.compile()# 执行图
- initial_state ={"subjects":[],"jokes":[]}print("初始状态:", initial_state)print("\n开始执行图...")
-
- result = graph.invoke(initial_state)print(f"\n最终结果: {result}")print("\n=== 演示完成 ===")if __name__ =="__main__":
- main()
复制代码 Command
Command基本用法
将控制流(边)和状态更新(节点)结合起来可能会很有用。例如,你可能希望在同一个节点中既执行状态更新,又决定下一步前往哪个节点。LangGraph 提供了一种实现方式,即从节点函数返回一个 Command 对象。
借助Command,可以实现动态控制流行为(与条件边相同):根据消息内容动态决定执行路径,并在节点中同时更新状态和控制流程。
- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import Command
- # 定义状态classAgentState(TypedDict):
- messages: Annotated[list,lambda x, y: x + y]
- current_agent:str
- task_completed:bool# 节点函数:决策代理defdecision_agent(state: AgentState)-> Command[AgentState]:print("执行节点: decision_agent")# 检查最新的消息
- last_message = state["messages"][-1]if state["messages"]else""print(f"最新消息: {last_message}")if"数学"in last_message:return Command(
- update={"messages":[("system","路由到数学代理")],"current_agent":"math_agent"},
- goto="math_agent")elif"翻译"in last_message:return Command(
- update={"messages":[("system","路由到翻译代理")],"current_agent":"translation_agent"},
- goto="translation_agent")else:return Command(
- update={"messages":[("system","任务完成")],"task_completed":True},
- goto=END
- )defmath_agent(state: AgentState)-> Command[AgentState]:print("执行节点: math_agent")
- result ="2 + 2 = 4"print(f"计算结果: {result}")# 更新状态并返回决策代理return Command(
- update={"messages":[("assistant",f"数学计算结果: {result}")],"current_agent":"decision_agent"},
- goto="decision_agent")deftranslation_agent(state: AgentState)-> Command[AgentState]:print("执行节点: translation_agent")
- translation ="Hello -> 你好"print(f"翻译结果: {translation}")# 更新状态并返回决策代理return Command(
- update={"messages":[("assistant",f"翻译结果: {translation}")],"current_agent":"decision_agent"},
- goto="decision_agent")defmain():# 创建图
- builder = StateGraph(AgentState)# 添加节点
- builder.add_node("decision_agent", decision_agent)
- builder.add_node("math_agent", math_agent)
- builder.add_node("translation_agent", translation_agent)# 设置入口点
- builder.add_edge(START,"decision_agent")# 编译图
- graph = builder.compile()# 执行图 - 测试数学任务print("测试1: 数学任务")
- initial_state ={"messages":[("user","我需要计算数学题")],"current_agent":"user","task_completed":False}print("初始状态:", initial_state)
- result = graph.invoke(initial_state)print("最终状态:", result)print("\n"+"="*50+"\n")# 执行图 - 测试翻译任务print("测试2: 翻译任务")
- initial_state ={"messages":[("user","我需要翻译文本")],"current_agent":"user","task_completed":False}print("初始状态:", initial_state)
- result = graph.invoke(initial_state)print("最终状态:", result)print("\n"+"="*50+"\n")# 执行图 - 测试完成任务print("测试3: 完成任务")
- initial_state ={"messages":[("user","你好")],"current_agent":"user","task_completed":False}print("初始状态:", initial_state)
- result = graph.invoke(initial_state)print("最终状态:", result)if __name__ =="__main__":
- main()
复制代码 Command vs 条件边
应该在什么时候使用Command而不是条件边?
使用Command:当你需要在更新状态的同时决定下一步执行哪个节点时。例如,在多智能体系统中,需要在切换智能体的同时传递信息。使用条件边:当你只需要在节点之间有条件地路由而不需要更新状态时。
父图导航
使用子图时,可能希望从子图内的某个节点导航到另一个子图(即父图中的另一个节点)。这在实现多智能体交接时特别有用。(跳出子图到父图中的节点)
要实现这一点,可以在Command中指定graph=Command.PARENT:
将graph设置为Command.PARENT将导航到最近的父图。当从子图节点向父图节点发送更新,且更新的键是父图和子图的共享的状态模式时,必须为父图状态中要更新的键定义一个reducer。
子图节点如何通过Command对象导航回父图,并更新父图的状态:
- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import Command
- classParentState(TypedDict):
- messages: Annotated[list,lambda x, y: x + y]
- task_status:str
- subtask_result:strclassChildState(TypedDict):
- messages: Annotated[list,lambda x, y: x + y]
- task_status:str
- subtask_result:str
- child_data:strdefmain_controller(state: ParentState)-> Command[ParentState]:print("执行节点: main_controller (父图)")return Command(
- update={"messages":[("system","启动子任务")],"task_status":"subtask_started"},
- goto="subgraph_node")deftask_finisher(state: ParentState)->dict:print("执行节点: task_finisher (父图)")return{"messages":[("system","任务完成")],"task_status":"completed"}# 子图节点:数据处理器defdata_processor(state: ChildState)-> Command[ParentState]:print("执行节点: data_processor (子图)")
- processed_data ="处理后的数据"print(f"处理结果: {processed_data}")# 导航回父图的task_finisher节点return Command(
- update={"messages":[("subtask",f"子任务完成: {processed_data}")],"subtask_result": processed_data,"task_status":"subtask_completed"},
- goto="task_finisher",
- graph=Command.PARENT # 指定导航到父图)defcreate_subgraph():"""创建子图"""
- subgraph_builder = StateGraph(ChildState)
- subgraph_builder.add_node("data_processor", data_processor)
- subgraph_builder.add_edge(START,"data_processor")
- subgraph_builder.add_edge("data_processor", END)return subgraph_builder.compile()# 编译子图defmain():# 创建父图
- parent_builder = StateGraph(ParentState)
- parent_builder.add_node("main_controller", main_controller)
- parent_builder.add_node("task_finisher", task_finisher)
- parent_builder.add_node("subgraph_node", create_subgraph())# 添加子图作为节点
- parent_builder.add_edge(START,"main_controller")
- parent_builder.add_edge("main_controller","subgraph_node")# 编译图
- graph = parent_builder.compile()# 执行图
- initial_state ={"messages":[("user","开始任务")],"task_status":"init","subtask_result":""}print("初始状态:", initial_state)
- result = graph.invoke(initial_state)print("最终状态:", result)if __name__ =="__main__":
- main()
复制代码 Runtime Context
创建Graph时,可以为传递给节点的运行时上下文指定一个context_schema。这对于向节点传递不属于图状态的信息很有用。例如,可以传递诸如模型名称或数据库连接之类的依赖项。
使用 context_schema 的优势:
(1)分离关注点:将运行时配置与图状态分离,保持状态的纯净性
(2)类型安全:通过定义数据类,提供类型检查和 IDE 自动补全支持
(3)易于管理:统一管理运行时依赖,如数据库连接、API密钥等
适用场景包括:
(1)传递模型配置信息(如模型名称、参数等)
(2)传递数据库连接、API密钥等敏感信息
(3)在不同环境中动态切换配置
(4)传递用户身份信息或其他运行时上下文
使用方式
(1)Context Schema(上下文结构)
使用 @dataclass 定义了一个 ContextSchema 类,定义的内容不属于图的状态,但在运行时需要被节点访问
(2)节点函数定义
节点函数接收两个参数:state(图的状态)和 runtime(运行时上下文)通过 runtime.context 访问上下文信息,如 runtime.context.model_name
(3)图的创建和执行
创建 StateGraph 时传入 context_schema=ContextSchema 参数调用 graph.invoke() 时通过 context 参数传递上下文数据
| 维度 | Runtime Context | State (状态) | | 数据性质 | 静态配置、依赖、元数据(不变) | 业务数据、中间结果、历史(可变) | | 修改权限 | 只读,不可更新 | 可读写,节点可通过返回 dict/Command 更新 | | 存储目的 | 控制如何执行(环境 / 配置) | 存储执行结果(业务数据) | | 典型场景 | user_id、db 连接、llm 模型名、API Key | messages、count、jokes、处理进度 |
- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.runtime import Runtime
- from langchain_core.messages import AIMessage, HumanMessage
- from dataclasses import dataclass
- # 定义状态结构classAgentState(TypedDict):
- messages: Annotated[list,lambda x, y: x + y]
- response:str# 定义上下文结构@dataclassclassContextSchema:
- model_name:str
- db_connection:str
- api_key:str# 节点函数:处理用户消息defprocess_message(state: AgentState, runtime: Runtime[ContextSchema])->dict:"""处理用户消息的节点,使用context中的信息"""print("执行节点: process_message")# 获取最新的用户消息
- last_message = state["messages"][-1].content if state["messages"]else""print(f"用户消息: {last_message}")# 使用runtime.context中的信息
- model_name = runtime.context.model_name
- db_connection = runtime.context.db_connection
- api_key = runtime.context.api_key
- print(f"使用的模型: {model_name}")print(f"数据库连接: {db_connection}")print(f"API密钥前缀: {api_key[:5]}***")# 只显示前5位,隐藏其余部分# 模拟使用这些信息处理请求
- response =f"使用 {model_name} 处理了您的请求,已连接到 {db_connection}"return{"messages":[AIMessage(content=response)],"response": response
- }# 节点函数:生成最终响应defgenerate_response(state: AgentState, runtime: Runtime[ContextSchema])->dict:"""生成最终响应的节点"""print("执行节点: generate_response")# 使用runtime.context中的信息
- model_name = runtime.context.model_name
- print(f"使用模型 {model_name} 生成最终响应")# 获取之前的结果
- previous_response = state["response"]# 生成更详细的响应
- final_response =f"{previous_response}\n\n这是使用 {model_name} 生成的完整响应。"return{"messages":[AIMessage(content=final_response)],"response": final_response
- }defmain():# 创建图,指定state_schema和context_schema
- builder = StateGraph(AgentState, context_schema=ContextSchema)# 添加节点
- builder.add_node("process_message", process_message)
- builder.add_node("generate_response", generate_response)# 添加边
- builder.add_edge(START,"process_message")
- builder.add_edge("process_message","generate_response")
- builder.add_edge("generate_response", END)# 编译图
- graph = builder.compile()# 定义初始状态
- initial_state ={"messages":[HumanMessage(content="请帮我查询最新的订单信息")],"response":""}# 定义上下文
- context = ContextSchema(
- model_name="gpt-4-turbo",
- db_connection="postgresql://user:pass@localhost:5432/orders_db",
- api_key="sk-abcdefghijklmnopqrstuvwxyz123456")print("初始状态:", initial_state)print("上下文信息:",{"model_name": context.model_name,"db_connection": context.db_connection,"api_key":f"{context.api_key[:5]}***"})print("\n"+"-"*50+"\n")# 执行图,通过context参数传递上下文
- result = graph.invoke(initial_state, context=context)print("\n"+"="*50)print("最终状态:", result)print("\n最终响应:")print(result["response"])if __name__ =="__main__":
- main()
复制代码 可视化
LangGraph 提供了多种图表可视化方式,帮助开发者更好地理解和调试工作流。
通过 graph.get_graph() 方法可以获取图的结构信息,包括节点和边的详细信息。基于这个信息,可以使用如下方式进行可视化:
生成 Mermaid 代码来可视化图结构。生成简单的 ASCII 文本图表,但需要安装额外的依赖。
- # 安装grandalf 来支持 ASCII 可视化。
- pip install grandalf
复制代码 打印的Mermaid代码可以用支持的工具进行展示,或者用在线网站查看效果(https://mermaid.live/)
高级特性
持久化(Persistence)
LangGraph 具有内置的持久化层,通过检查点工具来实现。当使用检查点工具编译图时,检查点工具会在每个超级步骤保存图状态的checkpoint。这些检查点会被保存到一个thread中,在图执行后可以访问该线程。由于threads允许在执行后访问图的状态,因此多种强大功能成为可能,包括人机协同、记忆、时间回溯和容错等。
**LangGraph API 会自动处理检查点。**使用 LangGraph API 时,无需手动实现或配置检查点工具。该 API 会在后台为您处理所有持久化基础架构。
Threads
线程(Thread)是检查点工具为其保存的每个检查点分配的唯一 ID 或线程标识符,它包含了一系列运行实例的累积状态。当某个运行实例执行时,底层图的状态会持久化到该线程中。使用检查点调用图时,必须在配置的可配置部分指定thread_id。- {"configurable":{"thread_id":"1"}}
复制代码 可以检索线程的当前状态和历史状态。要持久化状态,必须在执行运行之前创建线程。
Checkpoints
线程在特定时间点的状态称为检查点。检查点是在每个超级步骤保存的图状态快照,由具有以下关键属性的StateSnapshot对象表示:
config:与此检查点相关联的配置。metadata:与此检查点相关联的元数据。values:此时状态通道的值。next 下一个要在图中执行的节点名称元组。tasks:一个包含PregelTask对象的元组,这些对象包含关于接下来要执行的任务的信息。如果该步骤之前已尝试过,它将包含错误信息。如果图形从节点内部被动态中断,任务将包含与中断相关的额外数据。
检查点会被持久化,并可用于在之后恢复线程的状态。
graph.get_state(config):来查看图的最新状态。graph.get_state_history(config):可以获取特定线程的图执行完整历史。
内存检查点
langgraph-checkpoint:检查点保存器(BaseCheckpointSaver)的基础接口以及序列化/反序列化接口(SerializerProtocol)。包含用于实验的内存中检查点实现(InMemorySaver)。LangGraph 已内置 langgraph-checkpoint。
- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph import StateGraph, START, END
- from langgraph.checkpoint.memory import InMemorySaver
- import operator
- # 定义状态classPersistenceDemoState(TypedDict):
- messages: Annotated[list, operator.add]
- step_count: Annotated[int, operator.add]# 节点函数defstep_one(state: PersistenceDemoState)->dict:print("执行步骤 1")return{"messages":["执行了步骤 1"],"step_count":1}defstep_two(state: PersistenceDemoState)->dict:print("执行步骤 2")return{"messages":["执行了步骤 2"],"step_count":1}defstep_three(state: PersistenceDemoState)->dict:print("执行步骤 3")return{"messages":["执行了步骤 3"],"step_count":1}# 构建图defcreate_graph():
- builder = StateGraph(PersistenceDemoState)
- builder.add_node("step_one", step_one)
- builder.add_node("step_two", step_two)
- builder.add_node("step_three", step_three)
- builder.add_edge(START,"step_one")
- builder.add_edge("step_one","step_two")
- builder.add_edge("step_two","step_three")
- builder.add_edge("step_three", END)return builder
- defmain():# 创建内存存储器
- memory = InMemorySaver()# 编译图并使用内存存储
- graph = create_graph()
- app = graph.compile(checkpointer=memory)# 配置线程ID用于存储状态
- config ={"configurable":{"thread_id":"demo_thread_1"}}print("1. 首次执行工作流:")
- result = app.invoke({"messages":["开始执行"],"step_count":0}, config)print(f"执行结果: {result}\n")print("2. 检查存储的状态:")
- saved_state = app.get_state(config)print(f"保存的状态: {saved_state.values}")print(f"下一个节点: {saved_state.next}\n")print("3. 恢复执行工作流:")# 由于工作流已经完成,这里会直接返回最终结果
- result2 = app.invoke(None, config)print(f"恢复执行结果: {result2}\n")if __name__ =="__main__":
- main()
复制代码 数据库检查点
在底层,检查点功能由符合BaseCheckpointSaver接口的检查点对象提供支持。LangGraph提供了多种检查点实现,所有这些实现都通过独立的、可安装的库来完成,数据库类型的有:
langgraph-checkpoint-sqlite:使用SQLite数据库(SqliteSaver / AsyncSqliteSaver)存储检查点。非常适合实验和本地工作流程。需要单独安装。langgraph-checkpoint-postgres:使用Postgres数据库(PostgresSaver / AsyncPostgresSaver)存储检查点,用于LangSmith。非常适合在生产环境中使用。需要单独安装。
持久化执行(Durable execution)
LangGraph 中的 “持久化执行” 指:即便在工作流执行过程中遭遇系统崩溃、网络中断或服务重启等意外状况,仍能确保工作流从断点处继续执行,而非从头重新运行的能力。它是对 “持久化(Persistence)” 能力的延伸 —— 持久化侧重 “状态保存”,而持久化执行侧重 “基于已保存的状态实现可靠续跑”。
持久执行是一种流程或工作流在关键节点保存进度的技术,它允许流程暂停,之后能从暂停的精确位置继续执行。这在需要人机协同的场景中尤为有用,在这类场景中,用户可以在继续执行前检查、验证或修改流程;同时,在可能遇到中断或错误(例如,调用大语言模型超时)的长时间运行任务中也很有帮助。通过保留已完成的工作,持久执行使流程能够在无需重新处理先前步骤的情况下恢复——即使经过较长时间的延迟(例如,一周后)。
LangGraph 的内置持久化层为工作流提供持久执行能力,确保每个执行步骤的状态都能保存到持久化存储中。这一功能保证,无论工作流因系统故障还是人机交互而中断,都能从最后记录的状态恢复执行。
确定性与一致重放
当你恢复工作流运行时,代码不会从执行停止的同一行代码处恢复;相反,它会确定一个合适的起始点,从该点继续执行。这意味着工作流将从起始点重新执行所有步骤,直到到达停止的位置。
因此,当你为持久化执行编写工作流时,必须将任何非确定性操作(例如随机数生成)以及任何具有副作用的操作(例如文件写入、API调用)封装在任务或节点中。
为确保工作流程具有确定性且能够被一致地重放,确保以下几点:
**1)**避免重复工作
如果一个节点包含多个具有副作用的操作(例如,日志记录、文件写入或网络调用),请将每个操作包装在单独的任务中。这确保了在工作流恢复时,这些操作不会被重复执行,并且它们的结果会从持久化层中检索。
**2)**封装非确定性操作
将任何可能产生非确定性结果的代码(例如随机数生成)包装在任务或节点中。这确保了在恢复时,工作流会按照确切记录的步骤序列执行,并得到相同的结果。
**3)**使用幂等操作
在可能的情况下,确保副作用(例如API调用、文件写入)是幂等的。这意味着,如果某个操作在工作流中失败后重试,其效果将与第一次执行时相同。这对于导致数据写入的操作尤为重要。如果某个任务开始执行但未能成功完成,工作流的恢复将重新运行该任务,并依靠已记录的结果来保持一致性。使用幂等键或验证现有结果,以避免意外的重复,确保工作流执行顺畅且可预测。
持久性模式
LangGraph支持三种持久性模式,从持久性最低到最高的模式如下:
更高的持久性模式会给工作流执行增加更多开销。
调用任何图执行方法时,你可以指定持久性模式:- graph.stream({"input":"test"},
- durability="sync")
复制代码 **1)**exit
只有当图执行完成(无论是成功完成还是出现错误)时,更改才会被持久化。这为长时间运行的图提供了最佳性能,但意味着中间状态不会被保存,因此您无法从中途执行失败中恢复,也无法中断图的执行。
**2)**async
在执行下一步时,变更会异步持久化。这提供了良好的性能和耐久性,但存在一个小风险:如果进程在执行期间崩溃,检查点可能无法写入。
**3)**sync
在下一个步骤开始前,变更会被同步持久化。这确保了每个检查点都在继续执行前写入,以一定的性能开销为代价提供了高持久性。
恢复工作流
在工作流中启用持久化执行后,可以在以下场景中恢复执行:
暂停和恢复工作流:使用中断函数在特定点暂停工作流,并使用Command原语通过更新的状态恢复工作流(参见Graph API中的3.5.5的示例)。
从故障中恢复:发生异常(例如,大语言模型提供商中断)后,自动从最后一个成功的检查点恢复工作流。这需要通过提供None作为输入值,使用相同的线程标识符执行工作流(参见函数式API中5.8的示例)。
流处理(Streaming)
LangGraph 实现了一个流式传输系统,以呈现实时更新。通过逐步显示输出内容(即便完整响应尚未生成),流式传输能显著提升用户体验(UX),尤其在应对大语言模型(LLMs)的延迟问题时效果突出。
LangGraph 流式传输可以实现的功能:
图状态流式输出:通过updates和values模式获取图状态(也即通过state_schema指定的状态)的更新。子图流式输出:将所有父图和嵌套子图输出内容进行流式输出。大模型tokens流式输出:捕获来自于工具调用,子节点,子图等地方所有的token,并进行流式输出。流式传输自定义数据:直接从工具函数发送自定义更新或进度信号。使用多种流模式:可从values(完整状态)、updates(状态增量)、messages(大语言模型tokens+元数据)、custom(任意用户数据)或debug(详细跟踪)中选择多种模式进行流式输出。
支持的流模式
将以下一种或多种流模式以列表形式传递给stream或astream方法:
| 模式 | 描述 | | values | 在图的每一步之后流式传输状态的完整值。 | | updates | 在图的每一步之后流式传输状态更新。如果在同一步骤中进行了多项更新(例如,运行了多个节点),这些更新将被单独流式传输。 | | custom | 从图节点内部流式传输自定义数据。 | | messages | 从任何调用了大语言模型的图节点流式传输二元组(大语言模型token,元数据)。 | | debug | 在图的整个执行过程中尽可能多地流式传输信息。 | 基本用法
LangGraph有stream(同步)和astream(异步)方法,以迭代器的形式生成流式输出。
**1)**多模式流
将列表作为stream_mode参数传递,以同时流式传输多种模式。流式输出将是(mode, chunk)形式的元组,其中mode是流模式的名称,chunk是该模式所流式传输的数据。
**2)**流图状态
使用流模式updates和values来流式传输图在执行时的状态。
**3)**调试
使用debug流模式,在图的执行过程中流式传输尽可能多的信息。流式输出包括节点名称以及完整状态。
**4)**流式输出子图结果
要在流式输出中包含子图的输出,可以在父图的.stream()方法中设置subgraphs=True。这样将同时流式传输父图和所有子图的输出。
- from typing import TypedDict
- from langgraph.graph import StateGraph, START, END
- # 定义状态类型classState(TypedDict):
- question:str
- answer:str
- confidence:float
- steps:listdefthink(state: State):
- question = state["question"]
- steps =[f"分析问题: {question}","检索相关知识","形成初步答案"]return{"steps": steps}defrespond(state: State):
- question = state["question"]if"天气"in question:
- answer ="今天天气晴朗"
- confidence =0.9elif"时间"in question:
- answer ="现在是上午10点"
- confidence =0.8else:
- answer ="这是一个很好的问题"
- confidence =0.7return{"answer": answer,"confidence": confidence
- }defreflect(state: State):
- answer = state["answer"]
- confidence = state["confidence"]
- steps = state.get("steps",[])
- steps.append(f"验证答案: {answer}")
- steps.append(f"置信度评估: {confidence}")if confidence >0.8:
- conclusion ="高置信度答案"elif confidence >0.5:
- conclusion ="中等置信度答案"else:
- conclusion ="低置信度答案"
- steps.append(f"结论: {conclusion}")return{"steps": steps}defmain():# 构建图
- builder = StateGraph(State)
- builder.add_node("think", think)
- builder.add_node("respond", respond)
- builder.add_node("reflect", reflect)
- builder.add_edge(START,"think")
- builder.add_edge("think","respond")
- builder.add_edge("respond","reflect")
- builder.add_edge("reflect", END)
- graph = builder.compile()
- input_state ={"question":"今天天气怎么样?","answer":"","confidence":0.0,"steps":[]}print("--- 1. 使用 stream_mode='values' 模式 ---")print("显示每一步执行后的完整状态:")for chunk in graph.stream(input_state, stream_mode="values"):print(f" {chunk}")print("\n"+"="*60+"\n")print("--- 2. 使用 stream_mode='updates' 模式 ---")print("只显示每一步的状态更新:")for chunk in graph.stream(input_state, stream_mode="updates"):print(f" {chunk}")print("\n"+"="*60+"\n")print("--- 3. 同时使用多种流模式 ---")print("同时显示完整状态和状态更新:")for mode, chunk in graph.stream(input_state, stream_mode=["values","updates"]):print(f" [{mode}]: {chunk}")print("\n"+"="*60+"\n")print("--- 4. 使用 debug 模式 ---")print("显示详细的调试信息:")try:for chunk in graph.stream(input_state, stream_mode="debug"):print(f" {chunk}")except Exception as e:print(f" Debug模式可能需要特殊配置: {e}")if __name__ =="__main__":
- main()
复制代码 流式输出LLM响应
使用messages流模式,从图中的任何部分(包括节点、工具、子图或任务)逐token流式传输大型语言模型(LLM)的输出。
messages模式的流式输出是一个元组(message_chunk, metadata),其中:
message_chunk:来自大语言模型(LLM)的令牌或消息片段。metadata:一个包含图节点和大语言模型调用详情的字典。
- from typing import TypedDict
- from langgraph.graph import StateGraph,START
- from langchain.chat_models import init_chat_model
- import dotenv
- dotenv.load_dotenv()
- model = init_chat_model(model="qwen3-vl-plus",model_provider="openai")classState(TypedDict):
- query:str
- answer:strdefnode(state:State):print("开始调用node节点")
- llm_result = model.invoke([("user",state["query"])])print("llm invoke结束")return{"answer":llm_result}defmain():
- graph =(
- StateGraph(
- state_schema=State
- ).add_node(node).add_edge(START,"node").compile())
- inputs ={"query":"帮我生成一个300字的小学生作文,主题为我的一天"}#for chunk,meta_data in graph.stream(inputs,stream_mode="messages"):print(chunk.content,end="")if __name__ =='__main__':
- main()
复制代码 流式传输自定义数据
要从LangGraph节点或工具内部发送自定义用户定义数据,请遵循以下步骤:
使用get_stream_writer访问流写入器并发送自定义数据。调用.stream()或.astream()时,设置stream_mode=“custom"以在流中获取自定义数据。你可以组合多种模式(例如[“updates”, “custom”]),但至少有一种模式必须是"custom”。
- from typing import TypedDict
- from langgraph.config import get_stream_writer
- from langgraph.graph import StateGraph, START, END
- classState(TypedDict):
- query:str
- answer:str
- progress:listdefnode_with_custom_streaming(state: State):
- writer = get_stream_writer()
- writer({"custom_key":"开始处理查询"})
- writer({"progress":"步骤1: 分析查询内容","status":"running"})
- query = state["query"]# 模拟处理过程
- result =f"处理结果: {query.upper()}"
- writer({"progress":"步骤2: 生成结果","status":"running"})
- writer({"progress":"步骤3: 完成处理","status":"completed"})
- writer({"custom_key":"查询处理完成"})return{"answer": result,"progress": state.get("progress",[])+["处理完成"]}defmain():
- graph =(
- StateGraph(State).add_node("node_with_custom_streaming", node_with_custom_streaming).add_edge(START,"node_with_custom_streaming").add_edge("node_with_custom_streaming", END).compile())
- inputs ={"query":"hello world","answer":"","progress":[]}print("--- 1. 使用 custom 流模式 ---")try:# 设置 stream_mode="custom" 以在流中接收自定义数据for chunk in graph.stream(inputs, stream_mode="custom"):print(f"自定义数据块: {chunk}")except Exception as e:print(f"错误: {e}")print("说明: 在Graph API中,自定义流数据需要在节点中通过特定方式发送")print("\n"+"="*50+"\n")print("--- 2. 使用 updates 流模式 ---")for chunk in graph.stream(inputs, stream_mode="updates"):print(f"状态更新: {chunk}")print("\n"+"="*50+"\n")print("--- 3. 同时使用 custom 和 updates 流模式 ---")try:for mode, chunk in graph.stream(inputs, stream_mode=["custom","updates"]):print(f"[{mode}]: {chunk}")except Exception as e:print(f"错误: {e}")print("说明: 在Graph API中,需要特殊配置才能使用自定义流模式")if __name__ =="__main__":
- main()
复制代码 中断(Interrupts)
中断允许在特定点暂停图的执行,并在继续之前等待外部输入。这支持了“人在回路”模式,即需要外部输入才能继续的场景。当中断被触发时,LangGraph会利用其持久化层保存图的状态,并无限期等待,直到恢复执行。
通过在图形节点中的任意位置调用interrupt()函数,即可实现中断功能。该函数接受任何可序列化为JSON的值,并将其提供给调用者。
当需要从中断点继续时,只需调用Command即可。该命令的resume参数将会作为interrupt()函数的返回值,graph随之从当前点继续往下执行。
与静态断点(在特定节点之前或之后暂停,调用compile()时,所传递的interrupt_before / interrupt_after 参数)不同,中断是动态的——它们可以放在代码中的任何位置,并且可以根据应用程序逻辑设置为条件性的。
使用interrupt暂停
interrupt函数会暂停图的执行,并向调用者返回一个值。当在节点内调用interrupt时,LangGraph会保存当前的图状态,并等待用户通过输入来恢复执行。
使用interrrupt时,需要使用checkpointer来保存图状态,langgraph会将图状态保存到特定的thread ID当中。
当调用interrupt时,会发生以下情况:
图执行会在调用interrupt的确切位置暂停。保存状态,以便之后可以恢复执行。值会被返回给处于__interrupt__状态下的调用方;该值可以是任何可序列化为JSON的值(字符串、对象、数组等)。图会无限期等待,直到你通过响应恢复执行当恢复时,响应会传递回节点,成为interrupt()调用的返回值。
- from typing import TypedDict,Annotated
- from langgraph.types import interrupt
- from langgraph.graph import StateGraph,START,END
- from langgraph.checkpoint.memory import MemorySaver
- classMyState(TypedDict):
- state_1:str
- state_2:Annotated[list,lambda x,y:x+y]defnode_1(state:MyState):print("entering node_1")
- res = interrupt({"key_1":"value_1","key_2":"value_2"})return{"state_2":res}
- graph = StateGraph(MyState)
- graph.add_node(node_1)
- graph.add_edge(START,"node_1")
- graph.add_edge("node_1",END)
- checkpointer = MemorySaver()
- graph = graph.compile(checkpointer=checkpointer)
- config ={"configurable":{"thread_id":1}}
- invoke_result = graph.invoke({"state_1":"test","state_2":["1"]},
- config=config
- )# 打印结果:[Interrupt(value={'key_1': 'value_1', 'key_2': 'value_2'}, id='d6cb4b6d0bc74b831f81861a50187c87')]print(invoke_result['__interrupt__'])
复制代码 恢复中断
当中断暂停执行后,可以通过再次调用图并传入包含恢复值的Command来恢复图的运行。恢复值会被传回interrupt调用,使节点能够利用外部输入继续执行。
关于恢复的要点:
恢复时必须使用与中断发生时相同的线程ID。传递给Command(resume=…)的值会成为interrupt调用的返回值。节点在恢复时会从调用interrupt的节点开头重新启动,因此interrupt之前的所有代码会再次运行。可以将任何可JSON序列化的值作为恢复值传递。
- from langgraph.types import Command
- #打印结果: {'state_1': 'test', 'state_2': ['1', 'the value returned to interrupt invoke']}
- graph.invoke(Command(resume=["the value returned to interrupt invoke"]),config=config)
复制代码 审批工作流
中断最常见的用途之一是在执行关键操作前暂停并请求批准。例如,希望让用户批准一项API调用、一次数据库更改或其他任何高风险操作。
- from typing import Literal, Optional, TypedDict
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import Command, interrupt
- classApprovalState(TypedDict):
- action_details:str
- status:Optional[Literal['pending','approved','rejected']]defapproval_node(state:ApprovalState)->Command[Literal['proceed','cancel']]:print(f"执行节点: approval_node")print(f"操作详情: {state['action_details']}")print("工作流暂停,等待用户审批...")
- decision=interrupt({'question':'批准操作吗?','details':state['action_details']})
- next_node='proceed'if decision else'cancel'print(f"审批决定: {'批准'if decision else'拒绝'},路由到节点: {next_node}")return Command(goto=next_node)defproceed_node(state:ApprovalState):print("执行节点: proceed_node")print("操作已被批准,正在执行...")return{"status":"approved"}defcancel_node(state: ApprovalState):print("执行节点: cancel_node")print("操作已被拒绝,正在取消...")return{"status":"rejected"}defmain():
- builder= StateGraph(ApprovalState)
- builder.add_node('approval',approval_node)
- builder.add_node('proceed',proceed_node)
- builder.add_node('cancel',cancel_node)
- builder.add_edge(START,'approval')
- builder.add_edge('proceed',END)
- builder.add_edge('cancel',END)
- checkpointer=MemorySaver()
- graph=builder.compile(checkpointer=checkpointer)
- config={'configurable':{'thread_id':'hello123'}}print("1. 启动审批工作流...")
- initial = graph.invoke({"action_details":"转账 $500","status":"pending"},
- config=config,)print(f"工作流中断信息: {initial['__interrupt__']}\n")# 模拟用户审批过程print("2. 模拟用户审批过程...")
- interrupt_value=initial['__interrupt__'][0].value
- print("操作详情:", interrupt_value["details"])print("问题:", interrupt_value["question"])whileTrue:
- user_input =input("\n请输入审批决定 (y/n): ").strip().lower()if user_input in['y','yes','是']:
- decision =Truebreakelif user_input in['n','no','否']:
- decision =Falsebreakelse:print("无效输入,请输入 y/yes/是 或 n/no/否")# 使用用户决定恢复执行print(f"\n3. 使用审批决定恢复工作流执行...")
- resumed=graph.invoke(Command(resume=decision),config=config)print(f"最终状态: {resumed}")print(f"操作状态: {resumed['status']}")if __name__ =="__main__":
- main()
复制代码 工具中的中断
将中断直接放在工具函数内部。这会使工具在每次被调用时暂停以等待批准,并允许在执行工具调用之前进行人工检查和编辑。- from typing import TypedDict
- from langchain.tools import tool
- from langgraph.checkpoint.memory import MemorySaver
- from langgraph.graph import StateGraph, START, END
- from langgraph.types import Command, interrupt
- classAgentState(TypedDict):
- messages:list[dict]@tooldefsend_email(to:str,subject:str,body:str):'发邮件给收件人'print(f"执行工具: send_email")print(f"收件人: {to}")print(f"主题: {subject}")print(f"正文: {body}")
- response = interrupt({"action":"send_email","to": to,"subject": subject,"body": body,"message":"是否批准发送此邮件?",})if response.get('action')=='approve':
- final_to=response.get('to',to)
- final_subject = response.get("subject", subject)
- final_body = response.get("body", body)# 实际发送邮件(这里只是模拟)print(f"[send_email] to={final_to} subject={final_subject} body={final_body}")returnf"邮件已发送至 {final_to}"return"用户取消了邮件发送"defagent_node(state: AgentState):print("执行节点: agent_node")iflen(state['messages'])==1:
- tool_call={"name":"send_email","arguments":{"to":"alice@example.com","subject":"会议安排","body":"你好,我想安排一个会议讨论项目进展。"}}try:
- result=send_email.invoke(tool_call['arguments'])return{"messages": state["messages"]+[{"role":"assistant","content":f"调用工具: {tool_call['name']}"},{"role":"tool","name": tool_call["name"],"content": result}]}except Exception as e:# 捕获中断异常,让工作流暂停raise e
- else:# 后续调用,返回最终结果return{"messages": state["messages"]}defmain():# 创建状态图
- builder = StateGraph(AgentState)
- builder.add_node("agent", agent_node)
- builder.add_edge(START,"agent")
- builder.add_edge("agent", END)# 使用内存保存器作为检查点
- checkpointer = MemorySaver()# 编译图
- graph = builder.compile(checkpointer=checkpointer)# 配置线程ID
- config ={"configurable":{"thread_id":"email-workflow"}}# 初始化状态并执行图print("1. 启动邮件发送工作流...")try:
- initial = graph.invoke({"messages":[{"role":"user","content":"请发送邮件给alice@example.com关于会议安排"}]},
- config=config,)print(f"工作流中断信息: {initial['__interrupt__']}\n")# 模拟用户审批过程print("2. 模拟用户审批过程...")
- interrupt_value = initial["__interrupt__"][0].value
- print("操作:", interrupt_value["action"])print("消息:", interrupt_value["message"])print("收件人:", interrupt_value["to"])print("主题:", interrupt_value["subject"])print("正文:", interrupt_value["body"])# 获取用户输入whileTrue:
- user_input =input("\n是否批准发送邮件?(y/n): ").strip().lower()if user_input in['y','yes','是']:# 用户批准,可以编辑参数
- new_subject =input("请输入新主题(直接回车保持原主题): ").strip()ifnot new_subject:
- approval_response ={"action":"approve"}else:
- approval_response ={"action":"approve","subject": new_subject}breakelif user_input in['n','no','否']:
- approval_response ={"action":"reject"}breakelse:print("无效输入,请输入 y/yes/是 或 n/no/否")# 使用用户决定恢复执行print(f"\n3. 使用审批决定恢复工作流执行...")
- resumed = graph.invoke(
- Command(resume=approval_response),
- config=config,)# 显示最终结果print(f"最终消息: {resumed['messages'][-1]}")except Exception as e:print(f"执行过程中出现错误: {e}")if __name__ =="__main__":
- main()
复制代码 时间旅行(Time travel)
在处理基于模型做决策的非确定性系统(例如由大语言模型驱动的智能体)时,详细检查它们的决策过程可能会很有用:
(1)理解推理过程:分析达成成功结果的各个步骤。
(2)调试错误:确定错误发生的位置和原因。
(3)探索替代方案:测试不同的路径以发现更好的解决方案。
LangGraph 提供了时间回溯功能来支持这些使用场景。具体来说,可以从之前的检查点恢复执行——要么重放相同的状态,要么对其进行修改以探索其他可能性。在所有情况下,恢复过去的执行都会在历史记录中产生一个新的分支。
要在LangGraph中使用时间旅行:
(1)使用invoke或stream方法,以初始输入来运行图表。
(2)识别现有线程中的检查点:使用get_state_history方法检索特定thread_id的执行历史,并找到所需的checkpoint_id。或者,在希望执行暂停的节点之前设置一个interrupt。然后,你可以找到截至该中断记录的最新检查点。
(3)更新图状态(可选):使用update_state方法在检查点修改图的状态,并从替代状态恢复执行。
(4)从检查点恢复执行:使用invoke或stream方法,输入为None,配置中包含适当的thread_id和检查点ID
- import uuid
- from typing_extensions import TypedDict, NotRequired
- from langgraph.graph import StateGraph, START, END
- from langgraph.checkpoint.memory import MemorySaver
- classStoryState(TypedDict):
- character: NotRequired[str]
- setting: NotRequired[str]
- plot: NotRequired[str]
- ending: NotRequired[str]defcreate_character(state:StoryState):print("执行节点: create_character")
- mock_character='一直会说话的毛'print(f"创建的角色: {mock_character}")return{"character": mock_character}defset_setting(state:StoryState):print("执行节点: set_setting")
- mock_setting ="在一个神秘的图书馆里"print(f"设置的背景: {mock_setting}")return{"setting": mock_setting}defdevelop_plot(state: StoryState):print("执行节点: develop_plot")
- character = state.get("character","未知角色")
- setting = state.get("setting","未知背景")
- mock_plot =f"{character}在{setting}发现了一本会发光的书"print(f"发展的剧情: {mock_plot}")return{"plot": mock_plot}defwrite_ending(state:StoryState):print("执行节点: write_ending")
- plot = state.get("plot","未知剧情")
- mock_ending =f"当{plot}时,整个图书馆都被魔法光芒照亮了"print(f"编写的结局: {mock_ending}")return{"ending": mock_ending}defmain():
- workflow = StateGraph(StoryState)
- workflow.add_node("create_character", create_character)
- workflow.add_node("set_setting", set_setting)
- workflow.add_node("develop_plot", develop_plot)
- workflow.add_node("write_ending", write_ending)# 添加边来连接节点
- workflow.add_edge(START,"create_character")
- workflow.add_edge("create_character","set_setting")
- workflow.add_edge("set_setting","develop_plot")
- workflow.add_edge("develop_plot","write_ending")
- workflow.add_edge("write_ending", END)
- checkpointer=MemorySaver()
- graph=workflow.compile(checkpointer=checkpointer)print("1. 生成第一个故事...")
- config1 ={"configurable":{"thread_id":str(uuid.uuid4()),}}
- story1=graph.invoke({},config1)print(f"角色: {story1['character']}")print(f"背景: {story1['setting']}")print(f"剧情: {story1['plot']}")print(f"结局: {story1['ending']}")print()# 2. 查看历史状态print("2. 查看第一个故事的历史状态...")
- states1 =list(graph.get_state_history(config1))# 状态以栈的形式存储,索引从下往上print("历史状态:")for i, state inenumerate(states1):print(f" {i}. 下一步节点: {state.next}")print(f" 检查点ID: {state.config['configurable']['checkpoint_id']}")if state.values:print(f" 状态值: {state.values}")print()print("3. 从中间状态恢复执行,创建第二个故事...")
- character_state = states1[2]print(f"选中的状态: {character_state}")print(f"选中的状态值: {character_state.values}")
- new_config=graph.update_state(
- character_state.config,
- values={'character':"一只猪"})print(f"新配置: {new_config}")print()# 4. 从新检查点恢复执行print("4. 从新检查点恢复执行,生成第二个故事...")
- story2 = graph.invoke(None, new_config)print(f"新角色: {story2['character']}")print(f"背景: {story2['setting']}")print(f"剧情: {story2['plot']}")print(f"结局: {story2['ending']}")print()if __name__ =="__main__":
- main()
复制代码 记忆(Memory)
人工智能应用需要内存来在多次交互中共享上下文。在LangGraph中,你可以添加两种类型的内存:
(1)将短期记忆**(Checkpointer)**作为智能体状态的一部分添加,以实现多轮对话。
作用域:单个 thread(对话线程)存储内容:对话历史、状态数据生命周期:仅在当前 thread 内有效实现方式:通过 checkpointer 持久化 state
(2)添加长期记忆**(Store)**以跨会话存储用户特定数据或应用程序级数据。
作用域:跨 thread,可在任何对话中访问存储内容:用户偏好、知识库等跨会话数据生命周期:跨会话持久化实现方式:通过 store 存储,支持 namespace 组织
添加短期记忆
短期记忆(线程级持久性)使智能体能够跟踪多轮对话。
添加短期记忆就是制定检查点存储,并且指定线程id,按thread_id隔离。
如果图包含子图,只需在编译父图时提供检查点工具即可。LangGraph会自动将检查点工具传播到子图中。
添加长期记忆
利用长期记忆跨对话存储用户特定或应用程序特定的数据。- from typing import Annotated
- from typing_extensions import TypedDict
- from langchain_core.messages import HumanMessage, AIMessage
- from langgraph.graph import StateGraph, START
- from langgraph.store.memory import InMemoryStore
- # 定义状态classChatState(TypedDict):"""聊天状态定义"""
- messages: Annotated[list,lambda x, y: x + y]defchat_node(state: ChatState,*, store):"""
- 聊天节点
-
- Args:
- state: 当前状态
- store: 存储对象
-
- Returns:
- dict: 更新后的状态
- """print("执行节点: chat_node")# 获取用户ID(这里我们使用固定ID进行演示)
- user_id ="user_123"# 从存储中获取用户信息try:
- user_info_item = store.get(("users",), user_id)
- user_info = user_info_item.value if user_info_item else{}print(f"从存储中获取用户信息: {user_info}")except Exception as e:print(f"获取用户信息时出错: {e}")
- user_info ={}# 获取最新的用户消息
- user_messages =[msg for msg in state["messages"]ifisinstance(msg, HumanMessage)]if user_messages:
- latest_message = user_messages[-1].content
- print(f"用户消息: {latest_message}")# 从消息中提取信息if"我叫"in latest_message or"我是"in latest_message:# 简单提取姓名if"我叫"in latest_message:
- name_start = latest_message.find("我叫")+2else:
- name_start = latest_message.find("我是")+2
-
- name_end =len(latest_message)for i inrange(name_start,len(latest_message)):if latest_message[i]in[",",",",".","。","!","!","?","?"]:
- name_end = i
- break
- name = latest_message[name_start:name_end].strip()if name:
- user_info["name"]= name
-
- if"岁"in latest_message:# 提取年龄
- age_pos = latest_message.find("岁")
- age_str =""for i inrange(age_pos -1,-1,-1):if latest_message[i].isdigit():
- age_str = latest_message[i]+ age_str
- else:breakif age_str and age_str.isdigit():
- user_info["age"]=int(age_str)if"来自"in latest_message:# 提取位置
- location_start = latest_message.find("来自")+2
- location_end =len(latest_message)for i inrange(location_start,len(latest_message)):if latest_message[i]in[",",",",".","。","!","!","?","?"]:
- location_end = i
- break
- location = latest_message[location_start:location_end].strip()if location:
- user_info["location"]= location
-
- # 保存更新后的用户信息if user_info:try:
- store.put(("users",), user_id, user_info)print(f"保存用户信息到存储: {user_info}")except Exception as e:print(f"保存用户信息时出错: {e}")# 生成回复if"你好"in latest_message or"hello"in latest_message.lower():if user_info.get("name"):
- response =f"你好,{user_info['name']}!很高兴再次见到你。"else:
- response ="你好!我是AI助手。能告诉我你的名字吗?"elif"我叫"in latest_message or"我是"in latest_message:
- name = user_info.get("name","朋友")
- response =f"很高兴认识你,{name}!有什么我可以帮助你的吗?"elif"再见"in latest_message or"bye"in latest_message.lower():
- name = user_info.get("name","朋友")
- response =f"再见,{name}!期待下次与你交流。"else:# 基于用户信息的个性化回复
- info_parts =[]if user_info.get("name"):
- info_parts.append(f"名字是{user_info['name']}")if user_info.get("age"):
- info_parts.append(f"年龄是{user_info['age']}岁")if user_info.get("location"):
- info_parts.append(f"来自{user_info['location']}")if info_parts:
- info_summary =",而且我知道你"+",".join(info_parts)
- response =f"我理解你的问题。{info_summary}。让我来帮助你解答。"else:
- response ="我理解你的问题。让我来帮助你解答。"else:
- response ="我没有收到你的消息,请再说一遍。"print(f"生成的回复: {response}")return{"messages":[AIMessage(content=response)]}defmain():"""主函数 - 演示长期记忆功能"""print("=== LangGraph 长期记忆演示 ===\n")# 创建内存存储
- store = InMemoryStore()# 构建图
- builder = StateGraph(ChatState)
- builder.add_node("chat", chat_node)
- builder.add_edge(START,"chat")# 编译图并使用存储
- graph = builder.compile(store=store)# 第一轮对话print("1. 第一轮对话:")
- result1 = graph.invoke({"messages":[HumanMessage(content="你好,我叫张三,来自北京。")]})print("对话历史:")for msg in result1["messages"]:print(f" {type(msg).__name__}: {msg.content}")print()# 第二轮对话print("2. 第二轮对话:")
- result2 = graph.invoke({"messages":[HumanMessage(content="我今年25岁了。")]})print("对话历史:")for msg in result2["messages"]:print(f" {type(msg).__name__}: {msg.content}")print()# 第三轮对话print("3. 第三轮对话:")
- result3 = graph.invoke({"messages":[HumanMessage(content="你好!")]})print("对话历史:")for msg in result3["messages"]:print(f" {type(msg).__name__}: {msg.content}")print()# 查看存储的内容print("4. 查看存储的内容:")try:
- user_info_item = store.get(("users",),"user_123")if user_info_item:print(f"存储的用户信息: {user_info_item.value}")else:print("未找到用户信息")except Exception as e:print(f"查看存储内容时出错: {e}")print("\n=== 演示完成 ===")if __name__ =="__main__":
- main()
复制代码 管理短期记忆
启用短期记忆后,长对话可能会超出大语言模型(LLM)的上下文窗口。常见的解决方案包括:
修剪消息:删除开头或结尾的N条消息(在调用大语言模型之前)从LangGraph状态中永久删除消息总结消息:总结历史记录中较早的消息,并用摘要替换它们管理检查点以存储和检索消息历史自定义策略(例如,消息过滤等)
这使得智能体能够跟踪对话,同时不会超出大语言模型的上下文窗口。
修剪消息
大多数大语言模型都有一个最大支持的上下文窗口(以token为单位)。决定何时截断消息的一种方法是计算消息历史中的token数量,并在其接近该限制时进行截断。如果您使用LangChain,可以使用修剪消息工具,并指定要从列表中保留的token数量,以及用于处理边界的strategy(例如,保留最后的max_tokens)。
要修剪消息历史,请使用trim_messages函数:- from langchain_core.messages.utils import(
- trim_messages,
- count_tokens_approximately
- )
- messages = trim_messages(
- state["messages"],
- strategy="last",
- token_counter=count_tokens_approximately,
- max_tokens=128,
- start_on="human",
- end_on=("human","tool"),)
复制代码 删除消息
要从图状态中删除消息,可以使用RemoveMessage。RemoveMessage要正常工作,需要state的key带有add_messages这个reducer,例如MessagesState。- from langchain_core.messages import(
- RemoveMessage,)defdelete_messages(state: MessagesState):
- messages = state["messages"]print(f"删除前消息数量: {len(messages)}")iflen(messages)>2:# 删除最早的两条消息
- to_remove =[RemoveMessage(id=m.id)for m in messages[:2]]print(f"将删除 {len(to_remove)} 条消息")# 显示要删除的消息for i, msg inenumerate(messages[:2]):print(f" 删除消息 {i+1}: {type(msg).__name__} - {msg.content[:50]}{'...'iflen(msg.content)>50else''}")return{"messages": to_remove}else:print("消息数量不足,无需删除")return{}
复制代码 总结消息
修剪或删除消息存在一个问题,即清理消息队列时可能会丢失信息。正因为如此,一些应用程序会受益于一种更复杂的方法——使用聊天模型来总结消息历史。
提示词和编排逻辑可用于总结消息历史。
例如,在LangGraph中,你可以扩展MessagesState以包含一个summary键;然后,可以生成聊天历史的摘要,并将任何现有的摘要作为下一次摘要的上下文。在messages状态键中积累了一定数量的消息后,可以调用这个summarize_conversation节点。- defsummarize_conversation(messages: Sequence[BaseMessage], current_summary:str="")->str:ifnot messages:return current_summary
-
- # 如果有模型则调用,否则使用模拟摘要if summarization_model:try:# 构造总结提示
- summary_prompt =f"当前摘要: {current_summary}\\n\\n新对话:\\n"for msg in messages:ifisinstance(msg, HumanMessage):
- summary_prompt +=f"人类: {msg.content}\\n"elifisinstance(msg, AIMessage):
- summary_prompt +=f"AI: {msg.content}\\n"
-
- summary_prompt +="\\n请提供一个简洁的摘要,包含重要的信息和上下文:"
-
- response = summarization_model.invoke([SystemMessage(content=summary_prompt)])return response.content
- except Exception as e:print(f"调用总结模型出错: {e}")# 出错时使用模拟摘要pass# 模拟摘要生成
- summary_content =" ".join([msg.content for msg in messages[-3:]])# 取最后3条消息returnf"对话摘要: {summary_content[:100]}..."# 简单截取前100个字符
复制代码 子图
子图是一个用作另一个图中节点的图。子图可用于:
构建多个智能体系统。在多个图中重用一组节点。分布式开发:当你希望不同的团队独立处理图的不同部分时,你可以将每个部分定义为子图,而且只要遵循子图接口(输入和输出模式),父图就可以在不了解子图任何细节的情况下构建出来。
添加子图时,需要定义父图和子图如何通信:
从节点调用图—子图从父图的节点内部被调用。将图添加为节点——子图直接作为节点添加到父图中,并与父图共享状态。
使用方式
1)从节点调用 图
实现子图的一种简单方法是从另一个图的节点内部调用一个图。在这种情况下,子图可以与父图具有完全不同的模式(没有共享键)。例如,你可能希望为多智能体系统中的每个智能体保存私人消息历史。如果你的应用程序是这种情况,你需要定义一个节点函数来调用子图。该函数需要在调用子图之前将输入(父级)状态转换为子图状态,并在从节点返回状态更新之前将结果转换回父级状态。
**2)**将图形添加为节点
当父图和子图能够通过模式中的共享状态键(通道)进行通信时,你可以将一个图作为节点添加到另一个图中。例如,在多智能体系统中,智能体通常通过共享的消息键进行通信。
**3)**添加持久性
在编译父图时,你只需提供检查点工具。LangGraph会自动将检查点工具传播到子子图中。
**4)**查看子图状态
启用持久性后,您可以通过相应方法检查图状态(检查点)。要查看子图状态,可使用子图选项。
**5)**流式输出子图结
要在流式输出中包含子图的输出,你可以在父图的stream方法中设置subgraphs选项。这样会同时流式输出父图和所有子图的内容。- from typing import Annotated
- from typing_extensions import TypedDict
- from langgraph.graph.state import StateGraph, START
- from langgraph.checkpoint.memory import MemorySaver
- # 定义子图状态(不同的状态模式)classSubgraphState(TypedDict):
- bar:str
- baz:str# 定义父图状态(不同的状态模式)classParentState(TypedDict):
- foo:str# 定义共享状态的子图classSharedSubgraphState(TypedDict):
- foo:str# 共享状态键
- bar:str# 子图私有状态键# 定义用于中断演示的状态classInterruptState(TypedDict):
- foo:strdefsubgraph_node_1(state: SubgraphState):"""子图节点1"""print("执行子图节点1")return{"baz":"baz"}defsubgraph_node_2(state: SubgraphState):"""子图节点2"""print("执行子图节点2")return{"bar": state["bar"]+ state["baz"]}defshared_subgraph_node_1(state: SharedSubgraphState):"""共享状态子图节点1"""print("执行共享状态子图节点1")return{"bar":"bar"}defshared_subgraph_node_2(state: SharedSubgraphState):"""共享状态子图节点2"""print("执行共享状态子图节点2")return{"foo": state["foo"]+ state["bar"]}definterrupt_subgraph_node(state: InterruptState):"""用于中断演示的子图节点"""print("执行中断子图节点")# 模拟中断,实际应用中会使用 interrupt() 函数
- user_input =input("请输入值(模拟中断): ")return{"foo": state["foo"]+ user_input}defcreate_subgraph_different_schemas():"""创建具有不同状态模式的子图"""print("\\n=== 创建具有不同状态模式的子图 ===")
- subgraph_builder = StateGraph(SubgraphState)
- subgraph_builder.add_node("subgraph_node_1", subgraph_node_1)
- subgraph_builder.add_node("subgraph_node_2", subgraph_node_2)
- subgraph_builder.add_edge(START,"subgraph_node_1")
- subgraph_builder.add_edge("subgraph_node_1","subgraph_node_2")return subgraph_builder.compile()defnode_1(state: ParentState):"""父图节点1"""print("执行父图节点1")return{"foo":"hi! "+ state["foo"]}defnode_2(subgraph):"""父图节点2 - 调用子图"""def_call_subgraph(state: ParentState):print("执行父图节点2(调用子图)")# 转换状态到子图格式
- subgraph_input ={"bar": state["foo"],"baz":""}
- response = subgraph.invoke(subgraph_input)# 转换响应回父图格式return{"foo": response["bar"]}return _call_subgraph
- defcreate_parent_graph_with_subgraph_call(subgraph):"""创建通过节点调用子图的父图"""print("\\n=== 创建通过节点调用子图的父图 ===")
- builder = StateGraph(ParentState)
- builder.add_node("node_1", node_1)
- builder.add_node("node_2", node_2(subgraph))
- builder.add_edge(START,"node_1")
- builder.add_edge("node_1","node_2")return builder.compile()defcreate_shared_subgraph():"""创建具有共享状态的子图"""print("\\n=== 创建具有共享状态的子图 ===")
- subgraph_builder = StateGraph(SharedSubgraphState)
- subgraph_builder.add_node("shared_subgraph_node_1", shared_subgraph_node_1)
- subgraph_builder.add_node("shared_subgraph_node_2", shared_subgraph_node_2)
- subgraph_builder.add_edge(START,"shared_subgraph_node_1")
- subgraph_builder.add_edge("shared_subgraph_node_1","shared_subgraph_node_2")return subgraph_builder.compile()defcreate_parent_graph_with_node_subgraph(subgraph):"""创建将子图作为节点添加的父图"""print("\\n=== 创建将子图作为节点添加的父图 ===")
- builder = StateGraph(ParentState)
- builder.add_node("node_1", node_1)
- builder.add_node("node_2", subgraph)# 直接将子图作为节点添加
- builder.add_edge(START,"node_1")
- builder.add_edge("node_1","node_2")return builder.compile()defcreate_interrupt_subgraph():"""创建用于中断演示的子图"""print("\\n=== 创建用于中断演示的子图 ===")
- subgraph_builder = StateGraph(InterruptState)
- subgraph_builder.add_node("interrupt_subgraph_node", interrupt_subgraph_node)
- subgraph_builder.add_edge(START,"interrupt_subgraph_node")return subgraph_builder.compile()defcreate_parent_graph_with_interrupt_subgraph(subgraph):"""创建包含中断子图的父图"""print("\\n=== 创建包含中断子图的父图 ===")
- builder = StateGraph(InterruptState)
- builder.add_node("node_1", subgraph)
- builder.add_edge(START,"node_1")return builder.compile()defdemo_subgraph_call():"""演示从节点调用图"""print("\\n=== 演示从节点调用图 ===")
- subgraph = create_subgraph_different_schemas()
- parent_graph = create_parent_graph_with_subgraph_call(subgraph)print("开始执行图:")for chunk in parent_graph.stream({"foo":"foo"}, subgraphs=True):print(f"流式输出: {chunk}")defdemo_add_graph_as_node():"""演示将图添加为节点"""print("\\n=== 演示将图添加为节点 ===")
- subgraph = create_shared_subgraph()
- parent_graph = create_parent_graph_with_node_subgraph(subgraph)print("开始执行图:")for chunk in parent_graph.stream({"foo":"foo"}):print(f"流式输出: {chunk}")defdemo_subgraph_streaming():"""演示流式输出子图结果"""print("\\n=== 演示流式输出子图结果 ===")
- subgraph = create_shared_subgraph()
- parent_graph = create_parent_graph_with_node_subgraph(subgraph)print("开始流式执行图:")for chunk in parent_graph.stream({"foo":"foo"},
- stream_mode="updates",
- subgraphs=True,):print(f"流式输出: {chunk}")defmain():"""主函数"""print("=== LangGraph 子图功能演示 ===")# 演示从节点调用图
- demo_subgraph_call()print("\\n"+"="*50+"\\n")# 演示将图添加为节点
- demo_add_graph_as_node()print("\\n"+"="*50+"\\n")# 演示流式输出子图结果
- demo_subgraph_streaming()print("\\n=== 演示完成 ===")if __name__ =="__main__":
- main()
复制代码 原文地址:https://blog.csdn.net/qq_63945427/article/details/160481309 |