AI创想

标题: LangGraph——AI应用开发框架 [打印本页]

作者: 米落枫    时间: 4 小时前
标题: LangGraph——AI应用开发框架
作者:CSDN博客
目录
一,LangGraph介绍
1.1 认识智能服务(Agent Server)
1.2 LangGraph -- Agent Server 的“操作系统”
1.3 LangGraph生态系统
二,LangGraph核心概念
2.1 Agent
2.2 工作流
2.3 实现工作流的核心概念
State(状态) - 快递的"包裹信息"
Nodes(节点) - 快递站点
Edges(边) - 快递运输路线
三,LangGraph简单示例
3.1 LangGraph编码思路
3.2 智能快递配送系统
1.定义 State,设置快递的"包裹信息"
2.定义 StateGraph 图,成⽴快递公司
3.定义 Nodes,创建配送站点
4.添加 Nodes,建设配送站点
5.添加 Edges,规划运输路线
6.StateGraph 图编译,从公司创建到运⾏
7.完整代码+测试用例
3.3 支持搜索的智能代理系统
1.设置 Nodes
2.设置 State
3.设置 Edges
4.完整代码
3.4基于LangGraph实现的RAG
1.装备知识库并创建检索工具
2.创建节点
3.组装"工作流水线"
4.完整代码

一,LangGraph介绍

LangGraph 是 LangChain 生态中用于构建有状态、图驱动、支持循环 / 分支 / 多智能体协作LLM工作流编排框架,核心是用「状态 + 节点 + 边」建模复杂 AI 流程,解决传统线性链难以处理的循环、多轮决策、多角色协作问题。
在 LangChain 中,我们接入 LLM 构建的 AI 系统有以下特点:
以前用 LangChain 只能写:
LangGraph 就是为了解决这些:
1.1 认识智能服务(Agent Server)

什么是 Agent Server?它是更聪明的 AI 助⼿
想象一个虚拟的“小助手”,它不仅能回答问题,还能:
构建 Agent Server 时遇到的四⼤难题:
难题
描述
状态丢失
AI处理长任务时容易"忘记"前面步骤
难以调试
不知道AI为什么做出某个决定
无法干预
不能中途给AI指导
部署困难
复杂Agent Server难以上线运行
1.2 LangGraph -- Agent Server 的“操作系统”

LangGraph 就像是一个强大且灵活的“Agent Server 操作系统内核”。
它不关心我们具体用什么模型或提示词,而是为我们解决构建复杂、可靠、可交互的 Agent Server 时所面临的状态管理、流程编排、持久化和人工监督等底层⼯程难题。如果我们需要构建超越简单问答的、具备复杂逻辑和长期记忆的AI 应用,LangGraph 就是为此设计的⼯具。
如果把 Agent Server 比作一个公司,那么:
简单来说,LangGraph 是一个专门用于构建和管理 Agent Server 的底层框架。
1.3 LangGraph生态系统

LangGraph 是 LangChain 产品家族的一部分。可以与 LangSmith(用于追踪、评估、监控)、LangGraph 部署平台(⽤于轻松部署和扩展Agent Server)以及 LangChain(提供大量集成和组件)结合使⽤,形成完整的开发、调试、部署⼯作流。
LangGraph 与 LangChain 顺利集成,但也可以在没有 LangChain 的情况下使用。
二,LangGraph核心概念

2.1 Agent

Agent (智能体) 是⼀个能够感知环境输入、⾃主决策、规划⾏动路径, 并可调⽤⼯具或执⾏操作以达成目标的自主性软件实体。
其核心在于:由大语言模型 (LLM) 动态控制流程走向
想象你有一个超级聪明的程序员助手,你只需要说一句:"把这个项目的单元测试覆盖率提升到 80%."
然后他就自己去看代码、写测试、运⾏构建、检查结果……直到达标为为止⸺中间不需要你插手.
这个"能独立完成任务"的程序, 就是我们说的 Agent (智能体) .
想象一个私人助理机器人:
你说:"帮我安排下周去上海的差旅. "
它不会傻傻地只回复"好的", 而是会:
a. 查天气→ 决定带什么衣服
b. 查航班/高铁时间 → 比较价格
c. 预订酒店 → 发送日程到你的手机
d. 提醒你带身份证
整个过程没有固定的流程图, 它自己"想"出来的步骤 ⸺ 这就是典型的 Agent 行为
2.2 工作流

Workflow (⼯作流) 是⼀个将复杂过程分解为定义明确、顺序执⾏的任务流程, 并在其中⾃动化传递数据与任务的状态, 用户完成特定⽬标. 如果说 Agent 是聪明的执⾏者,那 ⼯作流就是它的⾏动蓝图 ⸺它定义了“先做什么、后做什么、在什么条件下跳转或终止”。
其核心在于:预设的、可重复的流程路径.
比如公司的报销流程
提交发票 → 领导审批 → 财务审核 → 财务结算 → 归档完成
或者一个自动化客服工单系统
用户提交问题 → 系统⾃动分类 → 分配给对应客服 → 客服处理 → 用户评价 → 工单关闭
这个固定路径, 整个过程沿着预设的流程运⾏, 就是典型的⼯作流⾏为.
LangGraph 是实现的就是⼯作流!
在 LangGraph中,工作流基于 LLM ,以及添加到其中的各种增强功能(如工具调用、结构化输出和短期记忆)而实现:
(, 下载次数: 0)


LangGraph 是一个强大且灵活的“Agent Server 操作系统内核”。它不关心我们具体用什么模型或提示词,⽽是为我们解决构建复杂、可靠、可交互的 Agent Server 时所面临的状态管理、流程编排、持久化和人工监督等底层⼯程难题。
2.3 实现工作流的核心概念

如何实现工作流逻辑?图计算是一种⽤节点和边来表示复杂系统的方法。在 AI 领域,它特别适合构建多步骤、有状态的智能工作流。
想象一个快递配送系统,下图展示了包裹从输入(揽收站)到输出(配送站)的过程:
(, 下载次数: 0)


(, 下载次数: 0)


State(状态) - 快递的"包裹信息"

State 就像快递包裹上的标签,记录着包裹的当前位置、目的地、配送状态等信息。在整个配送过程中,所有站点都能查看和更新这个信息。
状态特性:
代码示例:
  1. from typing import TypedDict
  2. class PackageState(TypedDict):
  3.     # 包裹的状态
  4.     package_id:str   #包裹ID
  5.     origin:str       #包裹的始发地
  6.     destination:str  #包裹的目的地
  7.     #配送状态("揽收中","配送中",...)
  8.     status:str
  9.     # 包裹的流转历史
  10.     # history:list[str]     #覆盖更新
  11.     history:Annotated[list[str],operator.add]   #追加更新
  12.     # 总里程
  13.     total_instance: Annotated[int,operator.add]
  14.     # 配送详情("加急","普通")
  15.     priority:str
复制代码
所有配送站点共享的包裹信息卡就是 LangGraph 中的 State!
Nodes(节点) - 快递站点

节点就像快递配送⽹络中的各个站点,每个站点负责特定的处理步骤。例如:
揽收站:接收包裹,初始化信息
分拣中心:根据目的地分类包裹
派送站:最终配送至收件人
节点特征:
代码示例(节点只不过是函数):
  1. import operator
  2. from typing import TypedDict, Annotated
  3. # 揽收站节点
  4. def receive_package(state:PackageState):
  5.     """揽收站"""
  6.     #获取出发地
  7.     origin = state["origin"]
  8.     return {
  9.         "status": "已揽收",
  10.         "history": [f"在{origin}揽收"]
  11.     }
  12. # 分拣中心节点:根据目的地进行分拣
  13. def sort_package(state:PackageState):
  14.     """分拣中心站"""
  15.     #获取目的地
  16.     destination = state["destination"]
  17.     if "北京" in destination:
  18.         next = "北京分拣中心"
  19.     elif "西安" in destination:
  20.         next = "西安分拣中心"
  21.     else:
  22.         next = "其他分拣中心"
  23.     return {
  24.         "status" : "已分拣",
  25.         "history": [f"在{next}分拣"]
  26.     }
  27. # 派送站节点
  28. def final_delivery(state:PackageState):
  29.     """派送站"""
  30.     destination = state["destination"]
  31.     return {
  32.         "status":"已签收",
  33.         "history":[f"在{destination}签收"]
  34.     }
  35. # 普通配送站节点
  36. def standard_delivery(state:PackageState):
  37.     """普通配送"""
  38.     return {
  39.         "status":"运输中",
  40.         "history":["标准陆运"],
  41.         "total_instance":500
  42.     }
  43. # 加急配送站节点
  44. def express_delivery(state:PackageState):
  45.     """加急配送"""
  46.     return {
  47.         "status":"加急运输",
  48.         "history":["加急空运"],
  49.         "total_instance":800
  50.     }
复制代码
Edges(边) - 快递运输路线

边定义了包裹在站点之间的流动路径,就像快递公司的运输路线图。对于路线,一般类型有:
实际上在 LangGraph 中,边就定义了节点之间的连接关系,决定了⼯作流的执⾏顺序。边的类型有:
因此,LangGraph 通过节点(每个处理步骤)、边(步骤之间的连接)和状态(保存执行过程),就可以构建出一个任务工作流(图)。
三,LangGraph简单示例

3.1 LangGraph编码思路

构建 Graph 图,首先需要【定义状态】,然后【定义并添加节点和边】,最后【编译】它。编译提供了对图形结构的一些基本检查(没有孤立节点等)。
LangGraph 所谓的“编译” 与 传统意义上的语言编译完全不同,LangGraph 编译本质是在运行时动态构建和验证一个复杂的图,而非翻译代码。
3.2 智能快递配送系统

1.定义 State,设置快递的"包裹信息"

定义图时要做的第一件事是定义图的 状态 。 状态 将是图中所有 节点 和 边 的输入,可以是TypedDict 或 Pydantic 模型( Pydantic 的性能不如 TypedDict )。
如下所示:
  1. class PackageState(TypedDict):
  2.     # 包裹的状态
  3.     package_id:str   #包裹ID
  4.     origin:str       #包裹的始发地
  5.     destination:str  #包裹的目的地
  6.     #配送状态("揽收中","配送中",...)
  7.     status:str
  8.     # 包裹的流转历史
  9.     # history:list[str]     #覆盖更新
  10.     history:Annotated[list[str],operator.add]   #追加更新
  11.     # 总里程
  12.     total_instance: Annotated[int,operator.add]
  13.     # 配送详情("加急","普通")
  14.     priority:str
复制代码
状态支持更新的关键是 Reducers。在示例中,我们使用 Annotated 类型为 history 和total_distance 指定了一个 reducer 函数 ( operator.add )。这定义了如何更新包裹信息,特别是当多个站点都要记录信息时。当状态更新时,新的值会追加到现有值中,而不是替换。
如下所示:
  1. # 覆盖更新:每次新状态替换旧状态
  2. status: str
  3. # 追加更新:新的流转记录添加到历史列表
  4. history: Annotated[list[str], add]
  5. # 数值累加:⾥程数累加
  6. total_distance: Annotated[int, add]
复制代码
2.定义 StateGraph 图,成⽴快递公司

StateGraph 是一个有状态图计算框架,它基于有向图(Directed Graph) 模型构建,专门设计用于处理多步骤、有状态的工作流程。
StateGraph 用来将复杂的工作流程可视化、模块化,让开发者能够像设计快递配送网路一样设计软件系统。通过这种思维方式,即使是复杂的多步骤 AI 应⽤也变得清晰可控。
我们需要使用 langgraph.graph.state.StateGraph 来定义。 StateGraph 仅是⼀个构建器类,可以使⽤State来构建。
如下所示:
  1. from langgraph.graph import StateGraph
  2. # 2. 成⽴快递公司
  3. delivery = StateGraph(PackageState)
复制代码
3.定义 Nodes,创建配送站点

接着我们可以定义各个配送站点(节点)。在 LangGraph 中, 节点就是一个Python 函数(同步或异步)。注意
如下所示:
  1. # 揽收站节点
  2. def receive_package(state:PackageState):
  3.     """揽收站"""
  4.     #获取出发地
  5.     origin = state["origin"]
  6.     return {
  7.         "status": "已揽收",
  8.         "history": [f"在{origin}揽收"]
  9.     }
  10. # 分拣中心节点:根据目的地进行分拣
  11. def sort_package(state:PackageState):
  12.     """分拣中心站"""
  13.     #获取目的地
  14.     destination = state["destination"]
  15.     if "北京" in destination:
  16.         next = "北京分拣中心"
  17.     elif "西安" in destination:
  18.         next = "西安分拣中心"
  19.     else:
  20.         next = "其他分拣中心"
  21.     return {
  22.         "status" : "已分拣",
  23.         "history": [f"在{next}分拣"]
  24.     }
  25. # 派送站节点
  26. def final_delivery(state:PackageState):
  27.     """派送站"""
  28.     destination = state["destination"]
  29.     return {
  30.         "status":"已签收",
  31.         "history":[f"在{destination}签收"]
  32.     }
  33. # 普通配送站节点
  34. def standard_delivery(state:PackageState):
  35.     """普通配送"""
  36.     return {
  37.         "status":"运输中",
  38.         "history":["标准陆运"],
  39.         "total_instance":500
  40.     }
  41. # 加急配送站节点
  42. def express_delivery(state:PackageState):
  43.     """加急配送"""
  44.     return {
  45.         "status":"加急运输",
  46.         "history":["加急空运"],
  47.         "total_instance":800
  48.     }
复制代码
4.添加 Nodes,建设配送站点

接下来,我们需要将各节点,组织进图中,即添加节点到 StateGraph 中。可以使用add_node() 将新节点添加到 StateGraph 。
  1. # 3.定义图
  2. delivery = StateGraph(PackageState)
  3. # 4.添加节点
  4. delivery.add_node("揽收站",receive_package)
  5. delivery.add_node("分拣中心",sort_package)
  6. delivery.add_node("派送站",final_delivery)
  7. delivery.add_node("普通派送站",standard_delivery)
  8. delivery.add_node("加急派送站",express_delivery)
复制代码
5.添加 Edges,规划运输路线

各节点(站点)准备好后,则需要为快递运输规划。
实际上,这就是为 图 定义 边 。边有几种关键类型:
例如,设置最简单运输路线:快递由揽收站接收,下⼀站固定为分拣中⼼,最后到派送中进⾏派送。
  1. # 固定边
  2. delivery.add_edge(START,"揽收站")
  3. delivery.add_edge("揽收站","分拣中心")
  4. def select_delivery(state:PackageState):
  5.     priority=state["priority"]
  6.     if priority=="加急":
  7.         return "备注加急"
  8.     elif priority=="普通":
  9.         return "无备注"
  10. # 添加条件边
  11. delivery.add_conditional_edges(
  12.     "分拣中心",      #条件起始节点
  13.     select_delivery,      #确定下个节点的可调用对象
  14.     {
  15.         "备注加急":"加急派送站",
  16.         "无备注":"普通派送站"
  17.     }
  18. )
  19. # 添加固定边
  20. delivery.add_edge("加急派送站","派送站")
  21. delivery.add_edge("普通派送站","派送站")
  22. delivery.add_edge("派送站",END)
复制代码
在 LangGraph 中:
6.StateGraph 图编译,从公司创建到运⾏

在步骤2中,我们仅是构建出 StateGraph ,还无法直接⽤于执⾏。LangGraph 要求:必须先编译图,然后才能使⽤它。编译提供了对图结构的一些基本检查,这会验证:
使⽤ compile() 方法即可编译图。该⽅法将 StateGraph 编译为 CompiledStateGraph 对象。编译后的图实现了 Runnable 接⼝,可以异步调⽤、流式传输、批处理和运⾏。
代码如下:
  1. # 7. 编译系统
  2. delivery_system = delivery.compile()
复制代码
7.完整代码+测试用例
  1. # 智能快递派送系统
  2. import operator
  3. from typing import TypedDict, Annotated
  4. from langgraph.constants import START, END
  5. from langgraph.graph import StateGraph
  6. # 1.状态定义(定义包裹的状态)
  7. class PackageState(TypedDict):
  8.     # 包裹的状态
  9.     package_id:str   #包裹ID
  10.     origin:str       #包裹的始发地
  11.     destination:str  #包裹的目的地
  12.     #配送状态("揽收中","配送中",...)
  13.     status:str
  14.     # 包裹的流转历史
  15.     # history:list[str]     #覆盖更新
  16.     history:Annotated[list[str],operator.add]   #追加更新
  17.     # 总里程
  18.     total_instance: Annotated[int,operator.add]
  19.     # 配送详情("加急","普通")
  20.     priority:str
  21. # 2.节点定义(定义函数)
  22. # 节点之间使用状态(state)进行通信
  23. # 揽收站节点
  24. def receive_package(state:PackageState):
  25.     """揽收站"""
  26.     #获取出发地
  27.     origin = state["origin"]
  28.     return {
  29.         "status": "已揽收",
  30.         "history": [f"在{origin}揽收"]
  31.     }
  32. # 分拣中心节点:根据目的地进行分拣
  33. def sort_package(state:PackageState):
  34.     """分拣中心站"""
  35.     #获取目的地
  36.     destination = state["destination"]
  37.     if "北京" in destination:
  38.         next = "北京分拣中心"
  39.     elif "西安" in destination:
  40.         next = "西安分拣中心"
  41.     else:
  42.         next = "其他分拣中心"
  43.     return {
  44.         "status" : "已分拣",
  45.         "history": [f"在{next}分拣"]
  46.     }
  47. # 派送站节点
  48. def final_delivery(state:PackageState):
  49.     """派送站"""
  50.     destination = state["destination"]
  51.     return {
  52.         "status":"已签收",
  53.         "history":[f"在{destination}签收"]
  54.     }
  55. # 普通配送站节点
  56. def standard_delivery(state:PackageState):
  57.     """普通配送"""
  58.     return {
  59.         "status":"运输中",
  60.         "history":["标准陆运"],
  61.         "total_instance":500
  62.     }
  63. # 加急配送站节点
  64. def express_delivery(state:PackageState):
  65.     """加急配送"""
  66.     return {
  67.         "status":"加急运输",
  68.         "history":["加急空运"],
  69.         "total_instance":800
  70.     }
  71. # 3.定义图
  72. delivery = StateGraph(PackageState)
  73. # 4.添加节点
  74. delivery.add_node("揽收站",receive_package)
  75. delivery.add_node("分拣中心",sort_package)
  76. delivery.add_node("派送站",final_delivery)
  77. delivery.add_node("普通派送站",standard_delivery)
  78. delivery.add_node("加急派送站",express_delivery)
  79. # 5.添加边
  80. # 固定边
  81. delivery.add_edge(START,"揽收站")
  82. delivery.add_edge("揽收站","分拣中心")
  83. def select_delivery(state:PackageState):
  84.     priority=state["priority"]
  85.     if priority=="加急":
  86.         return "备注加急"
  87.     elif priority=="普通":
  88.         return "无备注"
  89. # 添加条件边
  90. delivery.add_conditional_edges(
  91.     "分拣中心",      #条件起始节点
  92.     select_delivery,      #确定下个节点的可调用对象
  93.     {
  94.         "备注加急":"加急派送站",
  95.         "无备注":"普通派送站"
  96.     }
  97. )
  98. # 添加固定边
  99. delivery.add_edge("加急派送站","派送站")
  100. delivery.add_edge("普通派送站","派送站")
  101. delivery.add_edge("派送站",END)
  102. # 6.编译图
  103. delivery_system = delivery.compile()
  104. # 7.执行图(输入初始状态,输出最终状态)
  105. test_packages=[
  106.     {
  107.         "package_id":"p001",
  108.         "origin":"北京",
  109.         "destination":"上海",
  110.         "priority":"加急",
  111.         "history":[],
  112.         "total_instance":0
  113.     },
  114.     {
  115.         "package_id": "p002",
  116.         "origin": "西安",
  117.         "destination": "杭州",
  118.         "priority": "普通",
  119.         "history": [],
  120.         "total_instance": 0
  121.     }
  122. ]
  123. for package in test_packages:
  124.     print(f"\n配送包裹:{package['package_id']}")
  125.     #执行图
  126.     result = delivery_system.invoke(package)
  127.     print("最终状态:",result["status"])
  128.     print("配送历史",result["history"])
  129.     print("总里程",result["total_instance"])
复制代码
(, 下载次数: 0)


到此,我们已经构建出了⼀个图式的智能快递配送系统:
3.3 支持搜索的智能代理系统

核心功能如下:
流程设置如下
(, 下载次数: 0)


1.设置 Nodes

根据节点的单一职责特性,即每个节点只做一件事,我们可以设置:
节点1( tool_node ):专门负责搜索,获取搜索结果。
节点2( llm_call ):专门负责调用LLM,获取最终结果。
根据节点的独立性特性,即每个节点间不直接通信,⽽是通过 State 交互。它们接收 State,返回State 的更新。
2.设置 State

根据状态的共享性特性,即所有节点都能读取和修改状态。LangGraph 在许多情况下,将以前的对话历史记录存储为 State 中的消息列表会很有帮助,这样就能通过 State 中的消息列表来跟踪整个对话的完整历史,这是构建对话系统的关键。
为此,我们可以在图状态中添加一个 messages 键,如下所示:
  1. class MessageState(TypedDict):
  2.     #消息列表(AIMessage,ToolMessage,HumanMessage)
  3.     messages:Annotated[list[AnyMessage],operator.add]
  4.     #LLM调用次数
  5.     llm_calls:Annotated[int,operator.add]
复制代码
3.设置 Edges

已经定义了两个节点:
因此,可以定义以下逻辑:
4.完整代码
  1. # 支持搜索的智能代理系统
  2. import operator
  3. from typing import TypedDict, Annotated
  4. from langchain.chat_models import init_chat_model
  5. from langchain_core.messages import AnyMessage, SystemMessage, ToolMessage, HumanMessage
  6. from langchain_tavily import TavilySearch
  7. from langgraph.constants import START, END
  8. from langgraph.graph import StateGraph
  9. # 定义工具,模型绑定工具
  10. search=TavilySearch(max_results=4)
  11. tools =[search]
  12. model = init_chat_model(model="gpt-4o-mini")
  13. #绑定工具
  14. model_with_tools=model.bind_tools(tools)
  15. # 1.定义状态
  16. class MessageState(TypedDict):
  17.     #消息列表(AIMessage,ToolMessage,HumanMessage)
  18.     messages:Annotated[list[AnyMessage],operator.add]
  19.     #LLM调用次数
  20.     llm_calls:Annotated[int,operator.add]
  21. # 2.定义节点
  22. # 定义调用llm节点
  23. # 可能是由START节点过来的
  24. # 也可能是由"工具调用"节点过来的
  25. def llm_call(state:MessageState):
  26.     """LLM决定是否调用工具"""
  27.     # 如果是由START节点过来的,state中的messages就只包含用户输入的消息(HumanMessage)
  28.     # 如果是由"工具调用"节点过来的,state中的messages就包含(HumanMessage,AIMessage,ToolMessage)
  29.     messages = state["messages"]
  30.     # result可能1: 带有tool_calls的AIMessage
  31.     # result可能2: 不带tool_calls的AIMesssage
  32.     result = model_with_tools.invoke(
  33.         [
  34.             SystemMessage("你是一个乐于助人的助手,支持调用工具进行搜索")
  35.         ]
  36.         + messages
  37.     )
  38.     return {
  39.         "messages": [result],
  40.         "llm_calls":1
  41.     }
  42. # 获取所有工具的名称
  43. # 工具名称和工具的映射
  44. tools_by_name = {tool.name:tool for tool in tools}
  45. # 定义调用工具节点
  46. def tool_node(state:MessageState):
  47.     # 此时的AIMessage中一定包含工具调用的信息(tool_calls)
  48.     # 我们需要找到对应的工具并调用
  49.     # 当前state中的messages中的AIMessage就包含工具调用的信息(tool_call)
  50.     # 例如: 工具的ID,工具的参数,工具的名称
  51.     result = []    # 填充ToolMessage(工具的执行结果)
  52.     for tool_call in state["messages"][-1].tool_calls:
  53.         tool = tools_by_name[tool_call["name"]] # 由工具名称找到对应的工具
  54.         obs = tool.invoke(tool_call["args"])    # 传入参数,调用工具
  55.         result.append(ToolMessage(content=obs,tool_call_id=tool_call["id"]))
  56.     return {
  57.         "messages":result
  58.     }
  59. # 3.定义图,添加节点和边
  60. agent_builder = StateGraph(MessageState)
  61. agent_builder.add_node(llm_call)
  62. agent_builder.add_node(tool_node)
  63. # 设置固定边
  64. agent_builder.add_edge(START,"llm_call")
  65. def should_continue(state:MessageState):
  66.     # 如果state中的messages的AIMessage中包含tool_call,则要走tool_node节点
  67.     last_message = state["messages"][-1]
  68.     if last_message.tool_calls:
  69.         return "tool_node"
  70.     return END
  71. # 设置条件边
  72. agent_builder.add_conditional_edges(
  73.     "llm_call",
  74.     should_continue,
  75.     ["tool_node",END]
  76. )
  77. agent_builder.add_edge("tool_node","llm_call")
  78. # 4.编译图
  79. agent_search = agent_builder.compile()
  80. # 5.执行图
  81. result = agent_search.invoke({
  82.     "messages":[HumanMessage(content="今天西安的天气怎么样?")]
  83. })
  84. print(f"一共调用llm{result['llm_calls']}次")
  85. for msg in result["messages"]:
  86.     msg.pretty_print()
复制代码
3.4基于LangGraph实现的RAG

构建一个智能文档问答系统,核心思路如下:
因此,我们需要:
最终graph示意图:
(, 下载次数: 0)


1.装备知识库并创建检索工具
  1. # 1.准备知识库,并创建检索工具
  2. # 创建聊天模型与嵌入式模型
  3. model = init_chat_model("gpt-4o-mini")
  4. embedding_model = OpenAIEmbeddings(model="text-embedding-3-large")
  5. # 文档列表
  6. paths =[
  7.     "../Docs/MarkDown/城市展厅项目.md",
  8.     "../Docs/MarkDown/智能PPT导览项目.md",
  9.     "../Docs/MarkDown/记忆岛项目.md"
  10. ]
  11. # 文档加载器
  12. # 得到的一个结果的结构类似:
  13. # [Document(page_content="文档1内容", metadata={"source": "doc1.md"})]
  14. docs = [UnstructuredMarkdownLoader(path).load() for path in paths]
  15. # docs = [
  16. #     [Document(page_content="test1的内容", metadata={"source": "test1.md"})],
  17. #     [Document(page_content="test2的内容", metadata={"source": "test2.md"})]
  18. # ]
  19. # 把嵌套列表转换成扁平列表
  20. docs_list = [item for sublist  in docs for item in sublist]
  21. # docs_list = [
  22. #     Document(page_content="test1的内容", metadata={"source": "test1.md"}),
  23. #     Document(page_content="test2的内容", metadata={"source": "test2.md"})
  24. # ]
  25. # 使用分词器对文档进行分割
  26. text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
  27.     encoding_name="cl100k_base",
  28.     chunk_size=1000,
  29.     chunk_overlap=50
  30. )
  31. doc_splits = text_splitter.split_documents(docs_list)
  32. # 使用嵌入模型将文档转化为向量,使用内存数据库存储
  33. vector_store = InMemoryVectorStore.from_documents(
  34.     documents=doc_splits,
  35.     embedding=embedding_model
  36. )
  37. # 使⽤ LangChain 的预构建 create_retriever_tool 创建检索器⼯具
  38. retriever = vector_store.as_retriever(search_kwargs={"k": 2})
  39. retriever_tool = create_retriever_tool(
  40.     retriever,
  41.     "retrieve",
  42.     "搜索并返回有关城市展厅/智能PPT导览/记忆岛项目开发的信息。",
  43. )
复制代码
2.创建节点

generate_query_or_respond节点
该节点核心设计:
决定是直接回答还是检索文档
可以使用 model.bind_tools([retriever_tool]) 让模型能够调用检索工具
  1. # 2.定义状态
  2. # 此处可以自定义消息状态
  3. # 也可以直接使用LangGraph包中提供的MessagesState
  4. # class MessageState(TypedDict):
  5. #     #消息列表(AIMessage,ToolMessage,HumanMessage)
  6. #     messages:Annotated[list[AnyMessage],operator.add]
  7. # 3.定义节点
  8. # 定义决策节点(AI决策是否需要检索)
  9. # 调用LLM绑定检索工具的节点,判断是否需要调用检索工具
  10. def generate_query_or_respond(state:MessagesState):
  11.     """调用模型基于当前的状态生成响应,给定问题,它将决定是否使用工具进行检索,或者是直接给用户返回答案"""
  12.     result = model.bind_tools([retriever_tool]).invoke(state["messages"])
  13.     # result是AIMessage,可能包含tool_calls,也可能不包含
  14.     return {
  15.         "messages":[result]
  16.     }
复制代码
检索器工具节点retrieve
前⾯已经创建好了检索器工具。对于节点,可以使用 ToolNode 类来定义:
  1. # 定义工具节点,调用检索工具
  2. # 可以实现一个函数,函数中调用工具
  3. # 也可以直接使用LangGraph提供的ToolNode来生成工具节点
  4. retriever_node = ToolNode([retriever_tool])
复制代码
问题优化节点 rewrite_question
该节点主要负责当文档不相关时,重写问题以改进检索效果.
  1. # 让LLM重写问题的提示词
  2. REWRITE_PROMPT = (
  3.     "查看输⼊并尝试推断潜在的语义意图/含义。\n"
  4.     "这是最初的问题:"
  5.     "\n ------- \n"
  6.     "{question}"
  7.     "\n ------- \n"
  8.     "提出⼀个改进后的问题:"
  9. )
  10. # 定义重写节点(问题优化节点,优化用户提出的问题)
  11. def rewrite_node(state:MessagesState):
  12.     """重新用户原始问题,对用户的问题进行改进"""
  13.     messages = state["messages"]
  14.     # 此时的messages中包含
  15.     # HumanMessage(用户输入的问题)
  16.     # AIMessage(第一次调用LLM的结果,其中包含tool_calls)
  17.     # ToolMessage(工具调用的结果)
  18.     # 现在需要获取到HumanMessage,并调用LLM进行优化
  19.     question = messages[0].conent
  20.     prompt = REWRITE_PROMPT.format(question=question)
  21.     response = model.invoke([HumanMessage(content=prompt)])
  22.     #更新节点状态,把修改后的用户问题,设置为新的用户消息
  23.     return {
  24.         "messages":[HumanMessage(content=response.content)]
  25.     }
复制代码
答案⽣成节点 generate_answer
该节点将基于相关文档生成最终答案。
  1. # 定义答案节点
  2. # 定义答案生成的提示词
  3. GENERATE_PROMPT = (
  4.     "你是负责回答问题的助⼿。 "
  5.     "使⽤以下检索到的上下⽂⽚段来回答问题。 "
  6.     "如果你不知道答案,就说你不知道。 "
  7.     "最多只⽤三句话,回答要简明扼要。\n"
  8.     "Question: {question} \n"
  9.     "Context: {context}"
  10. )
  11. def generate_answer(state:MessagesState):
  12.     # 此时的消息中包含[HumanMessage,AIMessage,ToolMessage]
  13.     # 问题+检索结果
  14.     question = state["messages"][0].content   #HumanMessage
  15.     context = state["messages"][-1].content   #ToolMessage
  16.     prompt = GENERATE_PROMPT.format(question=question,context=context)
  17.     #调用LLM构建AIMessage,返回最终的结果
  18.     return {
  19.         "messages":[model.invoke(HumanMessage(context=prompt))]
  20.     }
复制代码
3.组装"工作流水线"

根据下图,开始组装"工作流水线"。
(, 下载次数: 0)


添加节点与入口点
  1. # 4.创建图,添加节点、边
  2. workflow = StateGraph(MessagesState)
  3. workflow.add_node(generate_query_or_respond)    #决策节点
  4. workflow.add_node("retriever",retriever_node)   #检索节点
  5. workflow.add_node(rewrite_node)                 #重写问题节点
  6. workflow.add_node(generate_answer)              #生成答案节点
  7. workflow.add_edge(START,"generate_query_or_respond")
  8. workflow.add_conditional_edges(
  9.     "generate_query_or_respond",
  10.     tools_condition,   #LangGraph提供的函数,检查是否需要调用工具
  11.     {
  12.         "tools":"retriever",
  13.         "__end__":END
  14.     }
  15. )
  16. #定义LLM调用后的结构化输出结果
  17. class GradeDocumnet(BaseModel):
  18.     score:str=Field(description="相关性评分,如果相关则为“yes”,如果不相关则为“no”")
  19. GRADE_PROMPT = (
  20.     "你是⼀个评分员,评估检索到的⽂档与⽤⼾问题的相关性。 \n "
  21.     "以下是检索到的⽂档: \n\n {context} \n\n"
  22.     "以下是⽤⼾的问题: {question} \n"
  23.     "如果⽂档包含与⽤⼾问题相关的关键字或语义,则将其评为相关。 \n"
  24.     "给出⼀个⼆元分数“yes”或“no”,以表明该⽂档是否与问题相关。"
  25. )
  26. #判断检索到的结果是否正确(正确则生成答案,有问题怎重写问题)
  27. def grade_documents(state:MessagesState)-> Literal["rewrite_node", "generate_answer"]:
  28.     """确定检索到的文档是否与问题相关"""
  29.     #将问题和检索到的文档交给LLM,判断是否相关(yes->rewrite_node,no->generate_answer)
  30.     #需要过滤出用户最新的消息和检索工具调用的结果
  31.     user_messages = filter_messages(state["messages"])
  32.     question = user_messages[-1].content
  33.     tool_message = state["messages"][-1]
  34.     context = tool_message.content
  35.     prompt = GRADE_PROMPT.format(context=context,question=question)
  36.     result = model.with_structured_output(GradeDocumnet).invoke(
  37.         [HumanMessage(content=prompt)]
  38.     ).content
  39.     if result.str=="yes":
  40.         return "generate_answer"
  41.     else:
  42.         return "rewrite_node"
  43. workflow.add_conditional_edges(
  44.     "retriever",
  45.     grade_documents,
  46.     ["generate_answer","rewrite_node"]
  47. )
  48. workflow.add_edge("generate_answer",END)
  49. workflow.add_edge("rewrite_node","generate_query_or_respond")
复制代码
编译并运行图
  1. # 5.编译图
  2. graph = workflow.compile()
  3. # 执行图
  4. # 流式输出
  5. for chunk in graph.stream(
  6.     {
  7.     "messages":[HumanMessage(content="记忆岛这个项目的开发用到了哪些技术?")]
  8.     }
  9. ):
  10.     print(chunk)
复制代码
4.完整代码
  1. # 基于LangGraph实现的RAG(检索增强生成)
  2. from typing import Literal
  3. from langchain.chat_models import init_chat_model
  4. from langchain_community.document_loaders import UnstructuredMarkdownLoader
  5. from langchain_core.messages import HumanMessage, filter_messages
  6. from langchain_core.tools import create_retriever_tool
  7. from langchain_core.vectorstores import InMemoryVectorStore
  8. from langchain_openai import OpenAIEmbeddings
  9. from langchain_text_splitters import RecursiveCharacterTextSplitter
  10. from langgraph.constants import START, END
  11. from langgraph.graph import MessagesState, StateGraph
  12. from langgraph.prebuilt import ToolNode, tools_condition
  13. from pydantic import BaseModel, Field
  14. # 1.准备知识库,并创建检索工具
  15. # 创建聊天模型与嵌入式模型
  16. model = init_chat_model("gpt-4o-mini")
  17. embedding_model = OpenAIEmbeddings(model="text-embedding-3-large")
  18. # 文档列表
  19. paths =[
  20.     "../Docs/MarkDown/城市展厅项目.md",
  21.     "../Docs/MarkDown/智能PPT导览项目.md",
  22.     "../Docs/MarkDown/记忆岛项目.md"
  23. ]
  24. # 文档加载器
  25. # 得到的一个结果的结构类似:
  26. # [Document(page_content="文档1内容", metadata={"source": "doc1.md"})]
  27. docs = [UnstructuredMarkdownLoader(path).load() for path in paths]
  28. # docs = [
  29. #     [Document(page_content="test1的内容", metadata={"source": "test1.md"})],
  30. #     [Document(page_content="test2的内容", metadata={"source": "test2.md"})]
  31. # ]
  32. # 把嵌套列表转换成扁平列表
  33. docs_list = [item for sublist  in docs for item in sublist]
  34. # docs_list = [
  35. #     Document(page_content="test1的内容", metadata={"source": "test1.md"}),
  36. #     Document(page_content="test2的内容", metadata={"source": "test2.md"})
  37. # ]
  38. # 使用分词器对文档进行分割
  39. text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
  40.     encoding_name="cl100k_base",
  41.     chunk_size=1000,
  42.     chunk_overlap=50
  43. )
  44. doc_splits = text_splitter.split_documents(docs_list)
  45. # 使用嵌入模型将文档转化为向量,使用内存数据库存储
  46. vector_store = InMemoryVectorStore.from_documents(
  47.     documents=doc_splits,
  48.     embedding=embedding_model
  49. )
  50. # 使⽤ LangChain 的预构建 create_retriever_tool 创建检索器⼯具
  51. retriever = vector_store.as_retriever(search_kwargs={"k": 2})
  52. retriever_tool = create_retriever_tool(
  53.     retriever,
  54.     "retrieve",
  55.     "搜索并返回有关城市展厅/智能PPT导览/记忆岛项目开发的信息。",
  56. )
  57. # 2.定义状态
  58. # 此处可以自定义消息状态
  59. # 也可以直接使用LangGraph包中提供的MessagesState
  60. # class MessageState(TypedDict):
  61. #     #消息列表(AIMessage,ToolMessage,HumanMessage)
  62. #     messages:Annotated[list[AnyMessage],operator.add]
  63. # 3.定义节点
  64. # 定义决策节点(AI决策是否需要检索)
  65. # 调用LLM绑定检索工具的节点,判断是否需要调用检索工具
  66. def generate_query_or_respond(state:MessagesState):
  67.     """调用模型基于当前的状态生成响应,给定问题,它将决定是否使用工具进行检索,或者是直接给用户返回答案"""
  68.     result = model.bind_tools([retriever_tool]).invoke(state["messages"])
  69.     # result是AIMessage,可能包含tool_calls,也可能不包含
  70.     return {
  71.         "messages":[result]
  72.     }
  73. # 定义工具节点,调用检索工具
  74. # 可以实现一个函数,函数中调用工具
  75. # 也可以直接使用LangGraph提供的ToolNode来生成工具节点
  76. retriever_node = ToolNode([retriever_tool])
  77. # 让LLM重写问题的提示词
  78. REWRITE_PROMPT = (
  79.     "查看输⼊并尝试推断潜在的语义意图/含义。\n"
  80.     "这是最初的问题:"
  81.     "\n ------- \n"
  82.     "{question}"
  83.     "\n ------- \n"
  84.     "提出⼀个改进后的问题:"
  85. )
  86. # 定义重写节点(问题优化节点,优化用户提出的问题)
  87. def rewrite_node(state:MessagesState):
  88.     """重新用户原始问题,对用户的问题进行改进"""
  89.     messages = state["messages"]
  90.     # 此时的messages中包含
  91.     # HumanMessage(用户输入的问题)
  92.     # AIMessage(第一次调用LLM的结果,其中包含tool_calls)
  93.     # ToolMessage(工具调用的结果)
  94.     # 现在需要获取到HumanMessage,并调用LLM进行优化
  95.     question = messages[0].conent
  96.     prompt = REWRITE_PROMPT.format(question=question)
  97.     response = model.invoke([HumanMessage(content=prompt)])
  98.     #更新节点状态,把修改后的用户问题,设置为新的用户消息
  99.     return {
  100.         "messages":[HumanMessage(content=response.content)]
  101.     }
  102. # 定义答案节点
  103. # 定义答案生成的提示词
  104. GENERATE_PROMPT = (
  105.     "你是负责回答问题的助⼿。 "
  106.     "使⽤以下检索到的上下⽂⽚段来回答问题。 "
  107.     "如果你不知道答案,就说你不知道。 "
  108.     "最多只⽤三句话,回答要简明扼要。\n"
  109.     "Question: {question} \n"
  110.     "Context: {context}"
  111. )
  112. def generate_answer(state:MessagesState):
  113.     # 此时的消息中包含[HumanMessage,AIMessage,ToolMessage]
  114.     # 问题+检索结果
  115.     question = state["messages"][0].content   #HumanMessage
  116.     context = state["messages"][-1].content   #ToolMessage
  117.     prompt = GENERATE_PROMPT.format(question=question,context=context)
  118.     #调用LLM构建AIMessage,返回最终的结果
  119.     return {
  120.         "messages":[model.invoke(HumanMessage(context=prompt))]
  121.     }
  122. # 4.创建图,添加节点、边
  123. workflow = StateGraph(MessagesState)
  124. workflow.add_node(generate_query_or_respond)    #决策节点
  125. workflow.add_node("retriever",retriever_node)   #检索节点
  126. workflow.add_node(rewrite_node)                 #重写问题节点
  127. workflow.add_node(generate_answer)              #生成答案节点
  128. workflow.add_edge(START,"generate_query_or_respond")
  129. workflow.add_conditional_edges(
  130.     "generate_query_or_respond",
  131.     tools_condition,   #LangGraph提供的函数,检查是否需要调用工具
  132.     {
  133.         "tools":"retriever",
  134.         "__end__":END
  135.     }
  136. )
  137. #定义LLM调用后的结构化输出结果
  138. class GradeDocumnet(BaseModel):
  139.     score:str=Field(description="相关性评分,如果相关则为“yes”,如果不相关则为“no”")
  140. GRADE_PROMPT = (
  141.     "你是⼀个评分员,评估检索到的⽂档与⽤⼾问题的相关性。 \n "
  142.     "以下是检索到的⽂档: \n\n {context} \n\n"
  143.     "以下是⽤⼾的问题: {question} \n"
  144.     "如果⽂档包含与⽤⼾问题相关的关键字或语义,则将其评为相关。 \n"
  145.     "给出⼀个⼆元分数“yes”或“no”,以表明该⽂档是否与问题相关。"
  146. )
  147. #判断检索到的结果是否正确(正确则生成答案,有问题怎重写问题)
  148. def grade_documents(state:MessagesState)-> Literal["rewrite_node", "generate_answer"]:
  149.     """确定检索到的文档是否与问题相关"""
  150.     #将问题和检索到的文档交给LLM,判断是否相关(yes->rewrite_node,no->generate_answer)
  151.     #需要过滤出用户最新的消息和检索工具调用的结果
  152.     user_messages = filter_messages(state["messages"])
  153.     question = user_messages[-1].content
  154.     tool_message = state["messages"][-1]
  155.     context = tool_message.content
  156.     prompt = GRADE_PROMPT.format(context=context,question=question)
  157.     result = model.with_structured_output(GradeDocumnet).invoke(
  158.         [HumanMessage(content=prompt)]
  159.     ).content
  160.     if result.str=="yes":
  161.         return "generate_answer"
  162.     else:
  163.         return "rewrite_node"
  164. workflow.add_conditional_edges(
  165.     "retriever",
  166.     grade_documents,
  167.     ["generate_answer","rewrite_node"]
  168. )
  169. workflow.add_edge("generate_answer",END)
  170. workflow.add_edge("rewrite_node","generate_query_or_respond")
  171. # 5.编译图
  172. graph = workflow.compile()
  173. # 执行图
  174. # 流式输出
  175. for chunk in graph.stream(
  176.     {
  177.     "messages":[HumanMessage(content="记忆岛这个项目的开发用到了哪些技术?")]
  178.     }
  179. ):
  180.     print(chunk)
复制代码
原文地址:https://blog.csdn.net/2401_82677021/article/details/159131621




欢迎光临 AI创想 (http://llms-ai.com/) Powered by Discuz! X3.4