AI创想

标题: LangGraph上手指南:快速构建AI工作流 [打印本页]

作者: 创想小编    时间: 6 天前
标题: LangGraph上手指南:快速构建AI工作流
作者:CSDN博客
一、什么是LangGraph?

LangGraph是一个用于构建、部署和运行基于状态的多步骤AI/Agent工作流的框架。它基于图论的概念,允许开发者创建由节点和边组成的工作流,实现复杂的AI应用逻辑。
核心价值

应用场景

一句话解释:工程化的Agent工作流搭建&运行框架,可以通过LangGraph快速搭建并运行AI-Workflow。
二、LangGraph的基本概念

1. LangGraph中的关键节点

概念英文描述
图/工作流Graph自动化工作流,通过节点和边的编排,实现自动化驱动的AI工作流应用。是整个系统的顶层容器。
状态State每个图都拥有的全局状态,状态可以存储任意数据,但只能由节点更新。状态在整个工作流中传递和累积。
节点Node单步工作容器,执行具体的工作项、更新状态、选择下一个节点等。是工作流的基本执行单元。
Edge定义节点间的流转关系,无需节点自行进行流转,系统会根据边的定义自动流转到下一个节点。
中断Interrupt图会暂停执行,将所有内容保存到检查点,后续可通过thread_id从中断处继续执行。用于需要用户交互的场景。
检查点Checkpoint图的运行状态快照,包含当前节点、状态数据、执行历史等信息。支持工作流的持久化和恢复。
起始节点START工作流的起点,所有执行都从这里开始。
结束节点END工作流的终点,到达这里表示工作流执行完成。
2. LangGraph的工作流程

执行流程

核心机制

三、LangGraph与LangChain的区别

核心定位

对比维度LangChainLangGraph
框架定位基础AI应用开发框架,提供LLM调用、工具集成等基础能力上层工作流编排框架,专注于复杂AI工作流和多智能体系统的搭建
工作流模型主要关注单个LLM调用和工具使用的链式组合,线性执行提供完整的状态管理和节点编排,支持复杂的分支、并行和条件执行
状态管理状态管理相对简单,主要通过链式传递,无全局状态拥有全局状态,可在整个工作流中持久化和更新,支持状态累积
并行执行主要是串行执行模式,缺乏原生并行支持原生支持节点并行执行,提高复杂工作流的执行效率
中断与恢复不支持执行中断和恢复,执行过程不可逆支持执行中断和从中断点恢复,适合需要用户交互的场景
扩展性适合构建相对简单的LLM应用,如问答、摘要等适合构建复杂的、多步骤的AI系统,如多智能体协作、复杂任务规划等
核心概念Chains(链)、Agents(代理)、Tools(工具)、Prompts(提示词)Nodes(节点)、Edges(边)、State(状态)、Interrupts(中断)、Checkpoints(检查点)
执行模式线性执行,每步依赖前一步结果,缺乏灵活性基于图的执行,支持复杂的分支、并行和条件流转
状态持久化无内置状态持久化机制,执行完成后状态丢失内置检查点机制,支持状态持久化和工作流恢复
用户交互有限的用户交互能力,主要是一次性输入输出强大的用户交互支持,可在任意点中断等待用户输入
适用场景简单的LLM应用、单步任务处理、基础工具集成复杂的多步骤工作流、多智能体协作系统、需要状态管理的应用、交互式AI系统
与LangChain的关系独立的基础框架可与LangChain集成,使用LangChain的组件构建节点,同时提供更强大的工作流能力
一句话解释:LangChain更偏向作为基础AI应用搭建的组件&工具链,LangGraph以前者为基础,更偏向于搭建复杂的、有状态的、AI工作流与更加复杂的AI应用
四、LangGraph快速上手

1. 环境搭建

步骤1:初始化项目
  1. mkdir langgraph-demo
  2. cd langgraph-demo
  3. npm init -y
复制代码
步骤2:安装依赖
  1. npminstall @langchain/core @langchain/langgraph typescript ts-node
复制代码
步骤3:创建tsconfig.json
  1. {"compilerOptions":{"target":"ES2022","module":"commonjs","moduleResolution":"node","esModuleInterop":true,"allowSyntheticDefaultImports":true,"strict":true,"skipLibCheck":true,"forceConsistentCasingInFileNames":true,"resolveJsonModule":true,"outDir":"./dist","rootDir":"./"},"include":["*.ts"],"exclude":["node_modules","dist"]}
复制代码
2. 上手案例

2.1 Graph示例

创建graph.ts文件
  1. import{
  2.   StateSchema,
  3.   GraphNode,
  4.   StateGraph,START,END,}from'@langchain/langgraph';import{ z }from'zod/v4';/**
  5. * 最小化计算器工作流案例,start=>sumNode=>mutliNode=>end
  6. */(async()=>{// 定义图全局状态const State =newStateSchema({
  7.     number1: z.number(),
  8.     number2: z.number(),
  9.     sum: z.number(),
  10.     mutli: z.number(),});// 创建求和节点const sumNode: GraphNode<typeof State>=(state)=>{return{ sum: state.number1 + state.number2 };};// 创建乘法节点const mutliNode: GraphNode<typeof State>=(state)=>{return{ mutli: state.number1 * state.number2 };};// 构建图const graph =newStateGraph(State).addNode('sumNode', sumNode).addNode('mutliNode', mutliNode).addEdge(START,'sumNode').addEdge('sumNode','mutliNode').addEdge('mutliNode',END).compile();// 执行图const result =await graph.invoke({ number1:2, number2:98});console.log('运行结果:', result);// 运行结果: { number1: 2, number2: 98, sum: 100, mutli: 196 }})();
复制代码
运行示例
  1. npx ts-node graph.ts
复制代码
2.2 Subgraph示例

创建subGraph.ts文件
  1. import{ StateGraph, StateSchema,START}from"@langchain/langgraph";import{ z }from"zod";/**
  2. * 主子图运行案例
  3. */(async()=>{// 定义子图状态const SubgraphState =newStateSchema({
  4.         result: z.string(),// 注意:这个键与父图状态共享});// 构建子图const subgraphBuilder =newStateGraph(SubgraphState).addNode("step1",(state)=>{return{ result: state.result +' -> 子图步骤1'};}).addNode("step2",(state)=>{return{ result: state.result +' -> 子图步骤2'};}).addEdge(START,"step1").addEdge("step1","step2");const subgraph = subgraphBuilder.compile();// 定义父图状态const ParentState =newStateSchema({
  5.         result: z.string(),});// 构建父图,将子图作为节点添加const builder =newStateGraph(ParentState).addNode("开始",(state)=>{return{ result: state.result +' -> 父图开始'};}).addNode("子图执行", subgraph)// 直接将子图作为节点添加.addEdge(START,"开始").addEdge("开始","子图执行");const graph = builder.compile();// 执行父图const result =await graph.invoke({ result:"初始值"});console.log("最终结果:", result);// 最终结果: { result: '初始值 -> 父图开始 -> 子图步骤1 -> 子图步骤2' }})();
复制代码
运行示例
  1. npx ts-node subGraph.ts
复制代码
2.3 CheckPoints示例

创建checkPoint.ts文件
  1. import{ StateGraph, StateSchema, GraphNode, MemorySaver }from"@langchain/langgraph";import*as z from"zod";/** 检查点案例 */(async()=>{// 图状态const State =newStateSchema({
  2.     topic: z.string(),// 主题
  3.     joke: z.string(),// 笑话
  4.     story: z.string(),// 故事
  5.     combinedOutput: z.string(),// 组合输出});// 节点// 第一个节点:生成笑话(替换了LLM调用)const callLlm1: GraphNode<typeof State>=async(state)=>{return{ joke:`为什么${state.topic}坐在电脑上?因为它想监视鼠标!`};};// 第二个节点:生成故事(替换了LLM调用)const callLlm2: GraphNode<typeof State>=async(state)=>{return{ story:`从前,有一只叫 whiskers 的${state.topic},它喜欢探索附近的街区。有一天,whiskers 发现了一个隐藏的花园,里面有五颜六色的花朵和友好的蝴蝶。这里成了 whiskers 每天下午最喜欢去的地方。`};};// 组合笑话、故事为单个输出const aggregator: GraphNode<typeof State>=async(state)=>{const combined =`这是关于${state.topic}的故事、笑话和诗歌!\n\n`+`故事:\n${state.story}\n\n`+`笑话:\n${state.joke}\n\n`;return{ combinedOutput: combined };};// 创建检查点容器const checkpointer =newMemorySaver();// 构建工作流const parallelWorkflow =newStateGraph(State).addNode("callLlm1", callLlm1).addNode("callLlm2", callLlm2).addNode("aggregator", aggregator).addEdge("__start__","callLlm1").addEdge("__start__","callLlm2").addEdge("callLlm1","aggregator").addEdge("callLlm2","aggregator").addEdge("aggregator","__end__").compile({ checkpointer });// 调用工作流 通过thread_id区分当前触发工作流的唯一标识,thread_id相同则会共享状态const result =await parallelWorkflow.invoke({ topic:"猫"},{ configurable:{ thread_id:"233"}});console.log("\n完整状态:");console.log(result);// 通过thread_id获取状态历史记录const stateHistory =await parallelWorkflow.getStateHistory({ configurable:{ thread_id:"233"}})forawait(const item of stateHistory){console.log(item)}// 通过thread_id重跑工作流const reRunResult =await parallelWorkflow.invoke({ topic:"狗"},{ configurable:{ thread_id:"233"}});console.log("\n重跑完整状态:");console.log(reRunResult);})()
复制代码
运行示例
  1. npx ts-node checkPoint.ts
复制代码
2.4 Interrupt示例

创建interrupt.ts文件
  1. import{ Annotation, Command,END,INTERRUPT, MemorySaver,START, StateGraph, interrupt, isInterrupted }from"@langchain/langgraph";/**
  2. * 最小化中断案例
  3. */(async()=>{// 1. 定义状态和节点const State = Annotation.Root({ user_input: Annotation<string>});// 2. 定义图节点   start => step1 => step2 => endconst graph =newStateGraph(State).addNode("step1",()=>{// 触发中断,图暂停const answer =interrupt("你的名字?")asstring;return{ user_input: answer };// 恢复后更新状态}).addNode("step2",(state)=>({ user_input:`Hello, ${state.user_input}!`})).addEdge(START,"step1").addEdge("step1","step2").addEdge("step2",END).compile({ checkpointer:newMemorySaver()});// 配置 checkpointer 存储状态// 3. 定义运行配置const config ={ configurable:{ thread_id:"unique-id"}};console.log("--- 1. 开始执行 ---");// 4. 第一次运行,遇到 interrupt 暂停const result1 =await graph.invoke({ user_input:""}, config);// 5. 检查是否中断if(isInterrupted(result1)){const info = result1[INTERRUPT][0];console.log(`--- 2. 中断: "${info.value}" (ID: ${info.id}) ---`);// 6. 模拟用户延迟输入let userNameInput =''awaitnewPromise(r =>setTimeout(()=>{console.log(`--- 3. 并模拟用户延迟输入名字`)
  4.       userNameInput ='Alice'r(userNameInput)},2000));console.log("--- 4. 恢复执行 ---");// 使用 Command 恢复,将 "Alice" 传回给 interrupt()const result2 =await graph.invoke(newCommand({ resume:{[info.id!]: userNameInput }})asany,
  5.       config
  6.     );console.log("最终结果:", result2);// --- 1. 开始执行 ---// --- 2. 中断: "你的名字?" (ID: 3972285f061e62e115f2826b26442662) ---// --- 3. 并模拟用户延迟输入名字// --- 4. 恢复执行 ---// 最终结果: { user_input: 'Hello, Alice!' }}})();
复制代码
运行示例
  1. npx ts-node interrupt.ts
复制代码
五、总结

LangGraph是一个强大的框架,用于构建复杂的AI工作流。它的核心优势包括:
六、更多参考资料


原文地址:https://blog.csdn.net/qq_44375977/article/details/158387039




欢迎光临 AI创想 (https://llms-ai.com/) Powered by Discuz! X3.4