作者:CSDN博客
LangGraph 内置了一个持久层,可以将图状态保存为检查点。当您使用检查点编译图时,执行的每一步的图状态都会保存成快照,并将其组织成线程。checkpointer非常重要,它是LangGraph中人机协作、对话记忆、中断恢复、时间旅行等核心功能的基础。
checkpoint可以选择保存在内存中或持久化到数据库中,我们在开发调试阶段通常可以选择保存在内存中,生产过阶段一定要持久化到数据库中。
一、使用检查点
- from langgraph.graph import StateGraph, START, END
- from langgraph.checkpoint.memory import InMemorySaver
- from langchain_core.runnables import RunnableConfig
- from typing import Annotated
- from typing_extensions import TypedDict
- from operator import add
- # 自定义状态
- class State(TypedDict):
- foo: str
- bar: Annotated[list[str], add]
- # 定义节点a
- def node_a(state: State):
- return {"foo": "a", "bar": ["a"]}
- # 定义节点b
- def node_b(state: State):
- return {"foo": "b", "bar": ["b"]}
- # 定义图
- workflow = StateGraph(State)
- workflow.add_node(node_a)
- workflow.add_node(node_b)
- workflow.add_edge(START, "node_a")
- workflow.add_edge("node_a", "node_b")
- workflow.add_edge("node_b", END)
- # 定义内存检查点
- checkpointer = InMemorySaver()
- # 使用checkpointer
- graph = workflow.compile(checkpointer=checkpointer)
- # 指定线程id
- config: RunnableConfig = {"configurable": {"thread_id": "1"}}
- result = graph.invoke({"foo": "", "bar":[]}, config)
- print(result)
复制代码 输出结果:
{'foo': 'b', 'bar': ['a', 'b']}
检查点1:空检查点,位于START节点之前,包含的内容为空检查点2:包含用户输入{'foo': '', 'bar': []},位于START和node_a之间检查点3:包含节点a的输出{'foo': 'a', 'bar': ['a']},位于node_a和node_b之间检查点4:包含{'foo': 'b', 'bar': ['a', 'b']},至此图已结束,后续没有节点需要执行
状态中的字段bar,因为定义的时候是Annotated[list[str], add]类型,其中的add定义了列表元素的重写方式是拼接。启动执行时,我们除了传入初始化的状态信息,还传入了一个config信息,指定了thread_id;如果使用了检查点进行编译图,就必须指定thread_id作为保存和查询检查点的唯一标识,否则就无法保存状态,否则程序会报错,这里大家可以自行尝试。
二、获取状态
- # 根据thread_id获取当前最新的状态
- config = {"configurable": {"thread_id": "1"}}
- print(graph.get_state(config))
- # 根据thread_id和checkpoint_id(检查点id)获取指定检查点的状态
- config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
- print(graph.get_state(config))
复制代码 输出如下类似结果:
StateSnapshot(values={'foo': 'b', 'bar': ['a', 'b']}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad3e-311d-6a98-8002-fd67129c8a6f'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2026-04-18T03:08:34.730448+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad3e-311c-660c-8001-10e55b51e9b4'}}, tasks=(), interrupts=())
调用graph.get_state方法会返回一个状态快照对象StateSnapshot,里面包含状态的内容,状态所属的线程id,检查点id以及一些元数据等
三、获取历史状态
- config = {"configurable": {"thread_id": "1"}}
- print(list(graph.get_state_history(config)))
复制代码 输出结果:
[StateSnapshot(values={'foo': 'b', 'bar': ['a', 'b']}, next=(), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d70-6522-8002-18cb8a9c2920'}}, metadata={'source': 'loop', 'step': 2, 'parents': {}}, created_at='2026-04-18T03:21:07.641875+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d6f-632a-8001-9ab89bfb348b'}}, tasks=(), interrupts=()),
StateSnapshot(values={'foo': 'a', 'bar': ['a']}, next=('node_b',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d6f-632a-8001-9ab89bfb348b'}}, metadata={'source': 'loop', 'step': 1, 'parents': {}}, created_at='2026-04-18T03:21:07.641414+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d6d-60b6-8000-f56001163fe7'}}, tasks=(PregelTask(id='3bfd0ac1-5282-474b-72bf-9fb36ddc1d53', name='node_b', path=('__pregel_pull', 'node_b'), error=None, interrupts=(), state=None, result={'foo': 'b', 'bar': ['b']}),), interrupts=()),
StateSnapshot(values={'foo': '', 'bar': []}, next=('node_a',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d6d-60b6-8000-f56001163fe7'}}, metadata={'source': 'loop', 'step': 0, 'parents': {}}, created_at='2026-04-18T03:21:07.640512+00:00', parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d66-648c-bfff-2a16f380b5a4'}}, tasks=(PregelTask(id='2ec94e33-4f22-0756-b2b3-6e069e77a6c5', name='node_a', path=('__pregel_pull', 'node_a'), error=None, interrupts=(), state=None, result={'foo': 'a', 'bar': ['a']}),), interrupts=()),
StateSnapshot(values={'bar': []}, next=('__start__',), config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f13ad5a-3d66-648c-bfff-2a16f380b5a4'}}, metadata={'source': 'input', 'step': -1, 'parents': {}}, created_at='2026-04-18T03:21:07.637766+00:00', parent_config=None, tasks=(PregelTask(id='552648e3-7d04-2dc8-5c52-105425a246d7', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'foo': '', 'bar': []}),), interrupts=())]
调用graph.get_state_history(config)方法会返回指定线程的所有历史状态
你可以根据特定的条件筛选特定的一个检查点,如通过next、step等信息进行筛选
四、持久化到数据库
安装依赖- pip install langgraph-checkpoint-sqlite
复制代码 代码- from langgraph.checkpoint.sqlite import SqliteSaver
- with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
- # 使用checkpointer
- graph = workflow.compile(checkpointer=checkpointer)
- config: RunnableConfig = {"configurable": {"thread_id": "1"}}
- result = graph.invoke({"foo": "", "bar":[]}, config)
- print(result)
- config = {"configurable": {"thread_id": "1"}}
- print(list(graph.get_state_history(config)))
复制代码 执行后可以看到当前文件夹中生成了一个sqlitedb文件,打开后可以开到状态的数据已经持久化到数据库中了
除了保存在sqlitedb中,还可以选择其他的数据库如Postgres、Azure Cosmos DB等生产环境中,我们一定要使用数据库持久化来保证断电等异常情况下仍然可以恢复执行
五、总结
使用了检查点进行编译的图,执行的每一步的状态都会保存成一个快照,用于记忆、中断恢复,重播等操作使用检查点编译,必须指定thread_id来唯一标识检查点的存储以及方便后续查询,否则程序会报错graph.get_state方法可以获取当前最新的快照或指定检查点的快照graph.get_state_history可以获取整个历史状态,我们可以从中筛选符合特定条件的状态状态快照在开发调试阶段可以保存在内存中,生产阶段需要保存在数据库中
原文地址:https://blog.csdn.net/lilianggui/article/details/149094448 |