作者:CSDN博客
LangGraph是一个用于构建状态化、多步骤AI应用的Python库,它基于Pregel计算模型,提供了构建复杂AI工作流的高级抽象。LangGraph通过图结构解决了传统AI应用在多步骤推理、状态管理、循环处理和人工干预等方面的局限性,支持状态化管理、图结构、可扩展性、容错性和可观察性等特性,适用于AI代理系统、多步骤工作流和数据处理流水线等场景。
在当今的 AI 应用开发中,我们经常面临多步骤、有状态、需要记忆的复杂场景。传统的线性处理方式已无法满足这些需求,这就催生了 LangGraph 框架的诞生。LangGraph 是一个专门用于构建状态化、多步骤 AI 应用的 Python 库,它基于 Pregel 计算模型,提供了构建复杂 AI 工作流的高级抽象。
什么是 LangGraph?
LangGraph 是一个用于构建状态化、多步骤 AI 应用的框架。与传统的单次调用 AI 模型不同,LangGraph 允许我们创建可以记住历史、做出决策、执行多个步骤并根据结果调整行为的复杂 AI 系统。
LangGraph 的核心特点包括:
状态化: 保持执行过程中的状态信息多步骤: 支持复杂的多步骤工作流可记忆: 通过检查点机制实现"短期记忆"可中断: 支持人工干预和决策可恢复: 支持暂停和恢复执行
为什么需要 LangGraph?
传统 AI 应用的局限性
传统的 AI 应用通常采用单次调用模式:输入 → AI 模型 → 输出。这种模式在以下场景中存在局限性:
多步骤推理: 需要多次调用 AI 模型进行推理状态管理: 需要记住历史交互信息循环处理: 需要根据结果决定是否继续人工干预: 需要在某些节点等待人工确认
LangGraph 解决的问题
LangGraph 通过图结构解决了上述问题:
图结构: 使用节点和边定义复杂的执行流程状态管理: 通过状态对象维护执行过程中的数据条件分支: 根据条件动态选择执行路径检查点: 持久化状态,支持恢复和重放
LangGraph 的核心概念
1. Pregel 模型
LangGraph 基于 Pregel 计算模型,这是一种用于大规模图计算的编程模型。Pregel 模型采用 Bulk Synchronous Parallel (BSP) 范式,将计算过程分为多个超步(Superstep):
每个超步包含三个阶段:
计划阶段: 确定此步骤中要执行的节点执行阶段: 并行执行选定的节点更新阶段: 更新通道中的值
2. StateGraph
StateGraph 是 LangGraph 中用于创建状态化工作流的主要抽象。它由以下几个核心组件构成:
状态(State): 定义图中共享的数据结构节点(Node): 执行特定功能的函数或可运行对象边(Edge): 定义节点间的执行顺序条件边(Conditional Edge): 基于条件动态选择下一个节点
- from typing import TypedDictfrom langgraph.graph import StateGraph, START, ENDclass State(TypedDict): value: intdef increment(state):return {"value": state["value"] + 1}def multiply_by_two(state):return {"value": state["value"] * 2}# 创建图builder = StateGraph(State)builder.add_node("increment", increment)builder.add_node("multiply", multiply_by_two)builder.add_edge(START, "increment")builder.add_edge("increment", "multiply")builder.add_edge("multiply", END)graph = builder.compile()# 执行图result = graph.invoke({"value": 1})print(result) # 输出: {'value': 4}
复制代码 3. Channels(通道)
Channels 是节点间通信的主要机制,定义了数据如何在图中流动。主要类型包括:
LastValue: 存储通道的最后值,适用于需要单一值的状态字段Topic: 发布-订阅通道,可累积多个值,适用于消息列表等场景BinaryOperatorAggregate: 使用二元操作符聚合值,适用于数值累加等场景
- from typing import TypedDictfrom typing_extensions import Annotatedimport operatorclass State(TypedDict):# 使用 reducer 累积消息 messages: Annotated[list, operator.add]# 使用 reducer 累积总数 total: Annotated[int, operator.add]
复制代码 4. Checkpoints(检查点)
检查点系统提供持久化状态的能力,支持暂停、恢复和重放:
- from langgraph.checkpoint.memory import MemorySaver# 创建检查点保存器memory = MemorySaver()graph = builder.compile(checkpointer=memory)# 执行图并持久化状态config = {"configurable": {"thread_id": "1"}}result = graph.invoke({"value": 0}, config)
复制代码 5. Interrupts(中断)
中断机制允许在图执行过程中暂停,以便进行人工干预:
- from langgraph.types import interruptdef node_with_interrupt(state):# 中断并等待用户输入 user_input = interrupt("请输入一个数字:")return {"user_input": str(user_input)}
复制代码 LangGraph 的应用场景
1. AI 代理系统
AI 代理通常需要多步骤推理、工具调用和状态管理:
- from langgraph.prebuilt import create_react_agentfrom langchain_openai import ChatOpenAIfrom langchain.tools import tool@tooldef get_weather(location: str) -> str:"""获取指定位置的天气"""return f"{location}的天气是晴朗的,温度22°C"tools = [get_weather]model = ChatOpenAI(model="gpt-3.5-turbo")agent = create_react_agent(model, tools)result = agent.invoke({"messages": [("user", "北京的天气如何?")]})
复制代码 2. 多步骤工作流
复杂的业务逻辑需要多个步骤协同完成:
- from typing import TypedDict, Literalfrom typing_extensions import Annotatedfrom langgraph.graph import StateGraph, START, ENDimport operatorclass Task(TypedDict): id: str description: str status: Literal["pending", "in_progress", "completed", "failed"] created_at: str completed_at: str | Noneclass TaskManagerState(TypedDict): tasks: Annotated[list[Task], operator.add] current_task_id: str | None completed_tasks: Annotated[list[Task], operator.add]def add_task(state): new_task = {"id": f"task_{len(state['tasks']) + 1}","description": f"Sample task {len(state['tasks']) + 1}","status": "pending","created_at": datetime.now().isoformat(),"completed_at": None }return {"tasks": [new_task]}def process_task(state): pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]ifnot pending_tasks:return {"current_task_id": None} task = pending_tasks[0]# 模拟任务处理 updated_task = {**task, "status": "completed", "completed_at": datetime.now().isoformat()}return {"current_task_id": task["id"],"completed_tasks": [updated_task] }def should_continue(state): pending_tasks = [t for t in state["tasks"] if t["status"] == "pending"]return"process"if pending_tasks else ENDbuilder = StateGraph(TaskManagerState)builder.add_node("add", add_task)builder.add_node("process", process_task)builder.add_conditional_edges(START, lambda s: "add")builder.add_conditional_edges("add", should_continue, ["process", END])builder.add_edge("process", "add")graph = builder.compile()
复制代码 3. 数据处理流水线
构建复杂的数据处理流水线:
- class DataPipelineState(TypedDict): raw_data: list processed_data: Annotated[list, operator.add] validation_errors: Annotated[list, operator.add] pipeline_status: strdef extract_data(state):"""从源提取数据""" raw_data = [ {"id": 1, "name": "Alice", "email": "alice@example.com"}, {"id": 2, "name": "Bob", "email": "invalid-email"}, {"id": 3, "name": "Charlie", "email": "charlie@example.com"} ]return {"raw_data": raw_data, "pipeline_status": "extracted"}def validate_data(state):"""验证数据""" errors = [] valid_data = []for item in state["raw_data"]:if"@"in item["email"] and"."in item["email"]: valid_data.append(item)else: errors.append(f"Invalid email for {item['name']}: {item['email']}") result = {"processed_data": valid_data}if errors: result["validation_errors"] = errorsreturn resultdef transform_data(state):"""转换数据格式""" transformed = []for item in state["processed_data"]: transformed.append({"user_id": item["id"],"full_name": item["name"].upper(),"contact": item["email"] })return {"processed_data": transformed, "pipeline_status": "completed"}builder = StateGraph(DataPipelineState)builder.add_node("extract", extract_data)builder.add_node("validate", validate_data)builder.add_node("transform", transform_data)builder.add_edge(START, "extract")builder.add_edge("extract", "validate")builder.add_edge("validate", "transform")builder.add_conditional_edges("transform", lambda s: END if s["pipeline_status"] == "completed"else"transform")graph = builder.compile()
复制代码 高级特性
1. 条件分支
根据条件动态选择执行路径:
- def should_continue(state):if state["value"] < 5:return"increment"else:return ENDdef increment(state):return {"value": state["value"] + 1}builder = StateGraph(State)builder.add_node("increment", increment)builder.add_conditional_edges(START, should_continue, ["increment", END])builder.add_edge("increment", START)graph = builder.compile()Result = graph.invoke({"value": 1})print(result) # 输出: {'value': 5}
复制代码 2. 并行处理
使用 Send API 实现并行处理:
- from langgraph.types import Senddef distribute_work(state):# 为每个项目创建任务return [Send("process_item", {"item": item}) for item in state["items"]]def should_continue(state):return"square"if len(state["results"]) < len(state["numbers"]) else END
复制代码 3. 错误处理和重试
内置的错误处理和重试机制:
- from langgraph.types import RetryPolicy# 定义重试策略retry_policy = RetryPolicy( initial_interval=0.5, backoff_factor=2.0, max_attempts=5, retry_on=Exception)builder.add_node("operation", unreliable_operation, retry_policy=retry_policy)
复制代码 4. 流式处理
支持多种流式输出模式:
- # 流式输出每个步骤的更新for chunk in graph.stream({"value": 1}, stream_mode="updates"): print(chunk)# 流式输出最终值for chunk in graph.stream({"value": 1}, stream_mode="values"): print(chunk)
复制代码 架构模式
1. ReAct 模式
ReAct (Reasoning + Acting) 模式是 LangGraph 中常用的 AI 代理模式:
2. Map-Reduce 模式
使用 Send API 实现 Map-Reduce 模式:
- def map_tasks(state):# 为每个项目创建任务return [Send("process", {"item": item}) for item in state["items"]]def reduce_results(state):# 合并结果return {"result": sum(state["partial_results"])}
复制代码 最佳实践
1. 状态设计
使用 TypedDict 或 Pydantic 模型明确定义状态为需要累积的字段使用适当的 reducer避免状态过于复杂,考虑分解为多个图
2. 错误处理
使用重试策略处理临时性错误
实现适当的错误恢复机制
记录和监控错误情况
3. 性能优化
使用检查点避免重复计算
合理设置递归限制
使用异步 API 提高并发性能
4. 调试和监控
使用流式输出监控执行过程
启用调试模式获取详细信息
使用检查点进行状态回溯
与其他组件的集成
1. LangChain 集成
LangGraph 与 LangChain 生态系统无缝集成:
与 LangChain 的 LLM、工具、记忆等组件无缝集成
使用 Runnable 接口实现节点
利用 LangChain 的回调系统
2. 外部系统集成
通过自定义节点集成数据库
与外部 API 交互
- 集成消息队列和事件系统
总结
LangGraph 提供了一个强大而灵活的框架来构建复杂的 AI 工作流。通过理解其核心概念(Pregel 模型、StateGraph、Channels、Checkpoints 等),开发者可以构建从简单到复杂的各种 AI 应用,从基本的数值处理到复杂的多步骤推理系统。
LangGraph 的主要优势包括:
状态化管理
提供了完整的状态管理机制
图结构
支持复杂的执行流程定义
可扩展性
支持子图、并行处理等高级特性
容错性
内置错误处理和重试机制
可观察性
提供丰富的监控和调试功能
随着 AI 应用复杂度的不断增加,LangGraph 为开发者提供了一个强大的工具,能够构建更加智能、灵活和可靠的 AI 系统。无论是构建 AI 代理、多步骤工作流,还是复杂的决策系统,LangGraph 都能提供相应的解决方案。
AI时代,未来的就业机会在哪里?
答案就藏在大模型的浪潮里。从ChatGPT、DeepSeek等日常工具,到自然语言处理、计算机视觉、多模态等核心领域,技术普惠化、应用垂直化与生态开源化正催生Prompt工程师、自然语言处理、计算机视觉工程师、大模型算法工程师、AI应用产品经理等AI岗位。
掌握大模型技能,就是把握高薪未来。
那么,普通人如何抓住大模型风口?
AI技术的普及对个人能力提出了新的要求,在AI时代,持续学习和适应新技术变得尤为重要。无论是企业还是个人,都需要不断更新知识体系,提升与AI协作的能力,以适应不断变化的工作环境。
因此,这里给大家整理了一份《2025最新大模型全套学习资源》,包括2025最新大模型学习路线、大模型书籍、视频教程、项目实战、最新行业报告、面试题等,带你从零基础入门到精通,快速掌握大模型技术!
由于篇幅有限,有需要的小伙伴可以扫码获取!
1. 成长路线图&学习规划
要学习一门新的技术,作为新手一定要先学习成长路线图,方向不对,努力白费。这里,我们为新手和想要进一步提升的专业人士准备了一份详细的学习成长路线图和规划。
2. 大模型经典PDF书籍
书籍和学习文档资料是学习大模型过程中必不可少的,我们精选了一系列深入探讨大模型技术的书籍和学习文档,它们由领域内的顶尖专家撰写,内容全面、深入、详尽,为你学习大模型提供坚实的理论基础。(书籍含电子版PDF)
3. 大模型视频教程
对于很多自学或者没有基础的同学来说,书籍这些纯文字类的学习教材会觉得比较晦涩难以理解,因此,我们提供了丰富的大模型视频教程,以动态、形象的方式展示技术概念,帮助你更快、更轻松地掌握核心知识。
4. 大模型项目实战
学以致用 ,当你的理论知识积累到一定程度,就需要通过项目实战,在实际操作中检验和巩固你所学到的知识,同时为你找工作和职业发展打下坚实的基础。
5. 大模型行业报告
行业分析主要包括对不同行业的现状、趋势、问题、机会等进行系统地调研和评估,以了解哪些行业更适合引入大模型的技术和应用,以及在哪些方面可以发挥大模型的优势。
6. 大模型面试题
面试不仅是技术的较量,更需要充分的准备。
在你已经掌握了大模型技术之后,就需要开始准备面试,我们将提供精心整理的大模型面试题库,涵盖当前面试中可能遇到的各种技术问题,让你在面试中游刃有余。
为什么大家都在学AI大模型?
随着AI技术的发展,企业对人才的需求从“单一技术”转向 “AI+行业”双背景。企业对人才的需求从“单一技术”转向 “AI+行业”双背景。金融+AI、制造+AI、医疗+AI等跨界岗位薪资涨幅达30%-50%。
同时很多人面临优化裁员,近期科技巨头英特尔裁员2万人,传统岗位不断缩减,因此转行AI势在必行!
这些资料有用吗?
这份资料由我们和鲁为民博士(北京清华大学学士和美国加州理工学院博士)共同整理,现任上海殷泊信息科技CEO,其创立的MoPaaS云平台获Forrester全球’强劲表现者’认证,服务航天科工、国家电网等1000+企业,以第一作者在IEEE Transactions发表论文50+篇,获NASA JPL火星探测系统强化学习专利等35项中美专利。本套AI大模型课程由清华大学-加州理工双料博士、吴文俊人工智能奖得主鲁为民教授领衔研发。
资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的技术人员,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。
大模型全套学习资料已整理打包,有需要的小伙伴可以微信扫描下方CSDN官方认证二维码,免费领取【保证100%免费】
原文地址:https://blog.csdn.net/ytt0523_com/article/details/156338769 |