AI创想

标题: [特殊字符]大模型应用开发必备!LangGraph框架详解:构建复杂多智能体系统,建议收藏学习 [打印本页]

作者: 米落枫    时间: 前天 07:35
标题: [特殊字符]大模型应用开发必备!LangGraph框架详解:构建复杂多智能体系统,建议收藏学习
作者:CSDN博客
LangGraph作为LangChain生态的重要扩展,通过有向图模型重构智能体工作流架构,支持状态管理、条件分支、循环等复杂控制流。本文详解其核心要素(State、Node、Edge)、状态合并策略、条件边、人机协作机制及多智能体架构(Supervisor、Swarm),并介绍Java版本实现,为构建复杂大模型应用提供强大基础设施。
1、LangGraph概述

1.1 什么是LangGraph

LangGraph 是LangChain 生态的一部分,专门用于构建基于大模型(LLM)的复杂、有状态、多智能体应用的框架,核心思想是将应用的工作流程抽象为一个有向图结构,通过节点和边来定义任务的执行步骤和逻辑流,从而提供了远超传统线性链式调用的灵活性和控制力。相比传统的线性执行模式,LangGraph 支持条件分支、循环、并行等复杂控制流,能够实现状态持久化、断点续跑、时间旅行、人机协作等高级功能,并提供了多智能体协作、层级架构等多种架构模式。在实际应用中,LangGraph已成功应用于智能客服、自动化运维、研究 Agent 等场景,展现出卓越的适应性和扩展性。
LangGraph在Github上的热度变化
(, 下载次数: 0)


1.2 为什么使用LangGraph

LangGraph 相比传统的线性执行模式具有显著的技术优势。
1.3 安装使用

安装LangGraph
  1. pip install -U langgraph
复制代码
使用LangGraph创建一个简单的Agent
  1. def get_weather(city: str) -> str:
  2. """获取指定城市的天气信息。
  3. Args:
  4. city: 城市名称
  5. Returns:
  6. 返回该城市的天气描述
  7. """
  8. return f"今天{city}是晴天"
  9. # 创建模型
  10. model = ChatOpenAI(
  11. model_name=model_name,
  12. base_url=base_url,
  13. api_key=api_key
  14. )
  15. # 使用LangGraph提供的API创建Agent
  16. agent = create_react_agent(
  17. model=model,          # 添加模型
  18. tools=[get_weather],  # 添加工具
  19. prompt="你是一个天气助手"
  20. )
  21. human_message = HumanMessage(content="今天深圳天气怎么样?")
  22. response = agent.invoke(
  23. {"messages": [human_message]}
  24. )
  25. print(response)
复制代码
(, 下载次数: 0)


运行模式:Agent可以通过两种主要模式执行
最大迭代次数:为了避免Agent无限循环执行,可以设置一个递归限制
  1. response = agent.invoke(
  2. {"messages": [{"role": "user", "content": "预定一个深圳到北京的机票"}]},
  3. {"recursion_limit": 10}   # 指定最大迭代次数
  4. )
复制代码
2、LangGraph核心

2.1 Graph(图)

(, 下载次数: 0)


图是一种由节点和边组成的用于描述节点之间关系的数据结构,分为无向图和有向图,有向图是带有方向的图。LangGraph通过有向图定义AI工作流中的执行步骤和执行顺序,从而实现复杂、有状态、可循环的应用程序逻辑。
2.2 LangGraph核心要素(State、Edge、Node)

1、State(状态)
在LangGraph中,State是一个贯穿整个工作流执行过程中的共享数据的结构,代表当前快照,它存储了从工作流开始到结束的所有必要的信息(历史对话、检索到的文档、工具执行结果等),在各个节点中共享,且每个节点都可以修改。State可以是TypedDict类型,也可以是pydantic中的BaseModel类型。
  1. # 定义状态
  2. class GraphState(TypedDict):
  3. process_data: dict # 默认更新策略为替换(后续会讲更新策略)
  4. # 创建一个状态图,并指定状态
  5. graph = StateGraph(GraphState)
复制代码
2、Node(节点)
Node是LangGraph中的一个基本处理单元,代表工作流中的一个操作步骤,可以是一个Agent、调用大模型、工具或一个函数(说白了就是绑定一个函数,具体逻辑可以干任何事情)。
Node的设计原则:
如下是添加一个节点的例子:
  1. # 定义一个节点,入参为state
  2. def input_node(state: GraphState) -> GraphState:
  3. print(state)
  4. return {"process_data": {"input": "input_value"}}
  5. # 定义带参数的node节点
  6. def process_node(state: dict, param1: int, param2: str) -> dict:
  7. print(state, param1, param2)
  8. return {"process_data": {"process": "process_value"}}
  9. graph = StateGraph(GraphState)
  10. # 添加inpu节点
  11. graph.add_node("input", input_node)
  12. # 给process_node节点绑定参数
  13. process_with_params = partial(process_node, param1=100, param2="test")
  14. # 添加带参数的node节点
  15. graph.add_node("process", process_with_params)
复制代码
特殊节点:
在LangGraph中有两个特殊的节点 __START__ (开始节点)和 __END__(结束节点)
__START__节点:开始节点,确定应该首先调用哪些节点。
  1. from langgraph.graph import START
  2. # 第一个执行的节点是 node_start
  3. graph.add_edge(START, "node_start")
复制代码
也可以通过graph.set_entry_point(“node_start”) 函数设置起始节点,等价于graph.add_edge(START, “node_start”)
__END__节点:终止节点,表示后续没有其他节点可以继续执行了(非必须)。
  1. from langgraph.graph import END
  2. # node_end 节点执行后,没有后续节点了
  3. graph.add_edge("node_a", END)
复制代码
也可以通过graph.set_finish_point(“node_end”) 函数设置结束节点,等价于graph.add_edge(“node_start”, END)
错误处理和重试机制:
LangGraph还提供了错误处理和重试机制来指定重试次数、重试间隔、重试异常等,用于保证系统的可靠性。
  1. # 重试策略
  2. retry_policy = RetryPolicy(
  3. max_attempts=3,                       # 最大重试次数
  4. initial_interval=1,                   # 初始间隔
  5. jitter=True,                          # 抖动(添加随机性避免重试风暴)
  6. backoff_factor=2,                     # 退避乘数(每次重试间隔时间的增长倍数)
  7. retry_on=[RequestException, Timeout]  # 只重试这些异常
  8. )
  9. graph.add_node("process", process_node, retry=retry_policy)
复制代码
节点缓存:
LangGraph 支持根据节点输入对节点进行缓存,用于加快节点的响应速度
  1. class State(TypedDict):
  2. x: int
  3. result: int
  4. builder = StateGraph(State)
  5. def expensive_node(state: State) -> dict[str, int]:
  6. # 模拟耗时
  7. time.sleep(2)
  8. return {"result": state["x"] * 2}
  9. #添加节点,并指定缓存策略
  10. builder.add_node("expensive_node", expensive_node, cache_policy=CachePolicy(ttl=3))
  11. builder.set_entry_point("expensive_node")
  12. builder.set_finish_point("expensive_node")
  13. graph = builder.compile(cache=InMemoryCache())
复制代码
3、Edge(边)
Edge定义了节点之间的连接和执行顺序,以及不同节点之间是如何通讯的,一个节点可以有多个出边(指向多个节点),多个节点也可以指向同一个节点(Map-Reduce),如下是添加边的代码:
  1. # 添加固定边,执行顺序:start -> input -> process -> output -> end
  2. graph.add_edge(START, "input")
  3. graph.add_edge("input", "process")
  4. graph.add_edge("process", "output")
  5. graph.add_edge("output", END)
  6. # 编译图,保证生成的图是正确的,如果添加了边,没添加节点,会报错
  7. app = graph.compile()
  8. app.invoke({})
复制代码
4、构建一个完整的图
图的构建流程:1、初始化一个StateGraph实例。2、添加节点。3、定义边,将所有的节点连接起来。4、设置特殊节点,入口和出口(可选)。5、编译图。6、执行工作流。
  1. # 定义状态
  2. class GraphState(TypedDict):
  3. process_data: dict
  4. def input_node(state: GraphState) -> GraphState:
  5. print(state)
  6. return {"process_data": {"input": "input_value"}}
  7. def output_node(state: GraphState) -> GraphState:
  8. print(state)
  9. return {"process_data": {"output": "output_value"}}
  10. def process_node(state: dict) -> dict:
  11. print(state)
  12. return {"process_data": {"process": "process_value"}}
  13. # 创建一个状态图,并指定状态
  14. graph = StateGraph(GraphState)
  15. # 添加input、process、output节点
  16. graph.add_node("input", input_node)
  17. graph.add_node("process", process_node)
  18. graph.add_node("output", output_node)
  19. # 添加固定边,执行顺序:start -> input -> process -> output -> end
  20. graph.add_edge(START, "input")
  21. graph.add_edge("input", "process")
  22. graph.add_edge("process", "output")
  23. graph.add_edge("output", END)
  24. # 编译图,保证生成的图是正确的,如果添加了边,没添加节点,会报错
  25. app = graph.compile()# 执行
  26. app.invoke({})
复制代码
(, 下载次数: 0)


2.3 状态合并策略(Reducers)

LangGraph工作流中,State作为贯穿整个节点之间共享数据的结构,每一个节点都可以读取当前State的数据,并且可以更新。Reducer是定义多个节点之间State如何更新的(覆盖、合并、添加等)。
1、直接覆盖:如果没有为状态字段指定 Reducer,默认会覆盖更新。也就是说,后执行的节点返回的值会直接覆盖先执行节点的值,即下一个节点的State数据是上一个节点的返回。
  1. class OverrideState(TypedDict):
  2. process_data : dict  # 未指定合并策略,默认覆盖,上一个节点的返回是下一个节点的值
复制代码
2、Annotated:使用类型注解指定合并策略
  1. class AddState(TypedDict):
  2. data_int: Annotated[int, add]   # 数字相加
  3. data_list: Annotated[list, add] # 合并两个列表
  4. data_str: Annotated[str, add]   # 字符串拼接
  5. def add_node1(state: AddState) -> AddState:
  6. print(state)
  7. return {"data_int": 1, "data_list": [1], "data_str": "hello "}
  8. def add_node2(state: AddState) -> AddState:
  9. print(state)
  10. return {"data_int": 2, "data_list": [2], "data_str": "world"}
复制代码
3、内置Reducer:add_messages(消息列表合并)
LangGraph提供的专用Reducer函数,能智能的合并消息列表,不只是简单的追加,add_messages能够保证消息列表正确被累计,常用在多轮对话系统中,主要逻辑包括:
  1. class MessageState(TypedDict):
  2. # 消息列表,使用add_messages合并消息列表
  3. messages: Annotated[list, add_messages]
  4. def system_node(state: MessageState) -> dict:
  5. return {"messages": [SystemMessage(content="你是一个精通LangGraph的专家工程师.")]}
  6. def user_input_node(state: MessageState) -> dict:
  7. return {"messages": [HumanMessage(content="什么是LangGraph?")]}
  8. def ai_response_node(state: MessageState) -> dict:
  9. return {"messages": [AIMessage(content="LangGraph是一个...")]}
  10. def tool_node(state: MessageState) -> dict:
  11. return {"messages": [ToolMessage(content="工具调用参数params1", tool_call_id="tool_call_id")]}
复制代码
4、自定义Reducer:实现自定义合并逻辑
  1. def merge_dict_reducer(source: dict, new: dict) -> dict:
  2. # 自定义合并逻辑
  3. result = source.copy()
  4. result.update(new)
  5. return result
  6. def max_reducer(source: int, new: int) -> int:
  7. # 自定义合并逻辑
  8. return max(source, new)
  9. class CustomReducerState(TypedDict):
  10. # 使用自定义Reducer的状态
  11. max_score: Annotated[int, max_reducer]  # 保留最大值
  12. metadata: Annotated[dict, merge_dict_reducer]  # 字典合并
复制代码
2.4 条件边(Conditional Edge)

实际应用中,工作流的下一个节点可能并不是固定的,需要根据当前的执行状态去确定需要路由到哪一个节点。条件边可以动态控制执行流程,LangGraph中可以指定路由函数,来选择具体要执行的节点(可以是多个节点)
(, 下载次数: 0)

  1. def route_by_sentiment(state: GraphState) -> str:
  2. # 路由逻辑...返回最终的条件
  3. return "condition_1"
  4. graph = StateGraph(GraphState)
  5. graph.add_node("node1", node1)
  6. graph.add_node("node2", node2)
  7. graph.add_node("node3", node3)
  8. # 添加路由函数,参数:当前节点,路由函数,路由函数返回的条件与node的映射
  9. graph.add_conditional_edges(
  10. START,
  11. route_by_sentiment,
  12. {
  13. "condition_1": "node1",
  14. "condition_2": "node2",
  15. "condition_3": "node3"
  16. }
  17. )
  18. # 所有处理节点都连接到END
  19. graph.add_edge("node1", END)
  20. graph.add_edge("node2", END)
  21. graph.add_edge("node3", END)
  22. app = graph.compile()
复制代码
LangGraph 提供了图的可视化,可以通过调用函数保存图,用于查看工作流是否与预期定义的规则一致。
  1. png_data = app.get_graph().draw_mermaid_png()
  2. with open("graph.png", "wb") as f:
  3. f.write(png_data)
复制代码
(, 下载次数: 0)


2.5 Send 和 Command

Send和Command是两种用于实现高级工作流控制的核心机制,用于支持动态地决定下一步执行哪个节点
1、Send:动态创建多个执行分支,实现并行处理,每个Send对象都指定了一个执行目标节点和传递给该节点的参数,LangGraph会并行执行所有的这些任务。比如可以用在Map-Reduce的场景,并行执行多个子节点并最终汇总到一个总节点。
(, 下载次数: 0)

  1. def route_tasks(state: MapReduceState) -> list[Send]:
  2. # 为每个任务创建一个Send对象
  3. sends = []
  4. for idx, task in enumerate(state['tasks']):
  5. # 创建node任务及相应的参数
  6. send = Send("process_task",{"task_id": idx,"task_name": task})
  7. sends.append(send)
  8. # 返回所有的目标节点
  9. return sends
  10. # 路由函数,返回 Send 列表
  11. graph.add_conditional_edges("generate_tasks", route_tasks)
  12. # 所有process_task完成后,汇总结果
  13. graph.add_edge("process_task", "reduce_results")
复制代码
2、Command:不仅可以指定下一个节点,还支持更新状态、处理中断恢复,以及在嵌套图之间导航。常用于复杂的人机交互(Human-in-the-loop)和多智能体协同工作中智能体与智能体之间交接执行权(handoffs)
(, 下载次数: 0)

  1. # 在节点函数中返回 Command 来实现动态路由
  2. def agent_node(state: State) -> Command:
  3. if need_help(state):
  4. # 决定将任务移交给另一个node,并更新状态
  5. return Command(
  6. goto="expert_agent",
  7. update={"messages": state["messages"] + [new_message]}
  8. )
  9. else:
  10. return Command(goto="END")
复制代码
Command与条件边的区别是:条件边只会路由下一个node节点,而Command不仅路由下一个node节点,还支持状态更新,如果需要同时更新状态和路由到不同的节点时,则使用 Command。
2.6 状态持久化

(, 下载次数: 0)


状态持久化指的是在程序运行时将瞬间的状态保存下来,以便后续需要的时候能够重新恢复执行,用于解决因为程序退出、重启等事件而丢失任务。在 LangGraph 如果使用了持久化,工作流执行的每个步骤结束后,系统会自动将当前整个图的状态(包括所有变量、历史消息、下一步要执行的节点等信息)完整地保存下来,这份存档就是一个检查点(Checkpoint),LangGraph支持存储在内存、Redis、DB等存储介质中。检查点通过thread_id(会话id,不是操作系统中的线程id)区分不同的会话,后续重新执行时会使用。
  1. memory = MemorySaver()
  2. app = graph.compile(checkpointer=memory)   # 使用内存保存检查点
  3. config = {"configurable": {"thread_id": "recovery_thread"}}  # 必须配置会话ID
  4. result = app.invoke({"value": 5, "operations": []}, config=config)
  5. # 获取所有的检查点
  6. checkpoints = list(app.get_state_history(config))
  7. # 恢复:从指定检查点继续执行
  8. recovery_config = checkpoints[2].config
  9. recovered_result = app.invoke(None, config=recovery_config)
复制代码
检查点是由一个StateSnapshot对象表示,具有以下关键属性:
  1. class StateSnapshot(NamedTuple):
  2. """Snapshot of the state of the graph at the beginning of a step."""
  3. values: dict[str, Any] | Any
  4. """Current values of channels."""
  5. next: tuple[str, ...]
  6. """The name of the node to execute in each task for this step."""
  7. config: RunnableConfig
  8. """Config used to fetch this snapshot."""
  9. metadata: CheckpointMetadata | None
  10. """Metadata associated with this snapshot."""
  11. created_at: str | None
  12. """Timestamp of snapshot creation."""
  13. parent_config: RunnableConfig | None
  14. """Config used to fetch the parent snapshot, if any."""
  15. tasks: tuple[PregelTask, ...]
  16. """Tasks to execute in this step. If already attempted, may contain an error."""
  17. interrupts: tuple[Interrupt, ...]
  18. """Interrupts that occurred in this step that are pending resolution."""
复制代码
(, 下载次数: 0)


2.7 时间旅行

如果使用状态持久化,则LangGraph在执行每一个节点的时候都会将整个图的状态及相关信息保存下来,包括所有变量、消息历史、下一步要执行的节点等,因此在任何一个节点都可以重新恢复当前的执行流程。LangGraph的时间旅行就是用来回溯、检查、修改一个工作流执行过程中的历史状态,并从某个历史节点重新执行,从而实现对智能体决策过程的调试、分析和路径探索。常用在以下场景:
  1. checkpoints = list(app.get_state_history(config))
  2. # 查看所有的步骤,注意,checkpoints的第一个值是Graph执行的最后一个节点(顺序是反的)
  3. for i, checkpoint in enumerate(checkpoints):
  4. print(f"步骤 {i}: 下一节点 {checkpoint.next}, 状态值 {checkpoint.values}")
  5. # 获取一个检查点
  6. checkpoint = checkpoints[2]
  7. # 更新状态,这会替换整个data_list
  8. app.update_state(
  9. checkpoint.config,
  10. {"data_list": ["updated_value"]}  # 完全替换状态
  11. )
  12. # 从更新后的检查点继续执行
  13. result = app.invoke(None, config=checkpoint.config)
复制代码
2.8 人机协作(Human-in-the-Loop)

在一个多Agent架构中,有时并非全自动化处理,可能需要人工参与才能继续后续的操作(比如我们在使用CodeBuddy编程或执行某个命令前,都需要人工确认是否采纳或执行)。HIL就是通过在关键节点引入人类干预,实现 AI 系统的可控性和准确性。人机协作能弥补 AI 的 “能力盲区” 和人类的 “效率瓶颈”,在保证处理速度的同时,大幅提升结果的准确性、安全性和适用性。
(, 下载次数: 0)


LangGraph通过中断机制、状态持久化、恢复执行机制在Agent自动化工作流中嵌入人工干预,实现人机协同。
  1. def human_feedback_node(state: HumanInLoopState) -> dict:
  2. # 定义中断信息,告诉外界为何中断以及需要什么样的输入来恢复
  3. interrupt_data = {
  4. "type": "human_review",
  5. "request": state['request'],
  6. "analysis": state['analysis'],
  7. "prompt": "请输入: 同意 / 拒绝"
  8. }
  9. # 使用interrupt()函数暂停工作流,等待人工输入
  10. human_response = interrupt(interrupt_data)
  11. print(f"收到用户输入: {human_response}")
  12. # 解析人工输入,其他业务逻辑
  13. # ...
  14. return {
  15. "human_feedback": human_response.get("feedback"),
  16. "approved": human_response.get("decision"),
  17. "messages": [f"人工反馈: {human_response.get('feedback')}"]
  18. }
  19. # 添加人工反馈节点,其他节点省略
  20. graph.add_node("human_feedback", human_feedback_node)
  21. # 添加人工反馈边,其他节点省略
  22. graph.add_edge("analyze", "human_feedback")
  23. # 添加条件边,根据用户反馈来选择调用后续的节点
  24. builder.add_conditional_edges(
  25. "human_feedback",
  26. route_by_human_decision,
  27. {
  28. "process_approval": "process_approval",
  29. "process_rejection": "process_rejection"
  30. }
  31. )
  32. memory = MemorySaver()
  33. # 编译图并启用检查点
  34. app = graph.compile(checkpointer=memory)
  35. # 配置会话id,用于区分不同的会话
  36. config = {"configurable": {"thread_id": str(uuid.uuid4())}}
  37. # 首次执行图,执行到human_feedback节点会中断,invoke立即返回# 返回的结果会包含中断信息,result.get(__interrupt__)
  38. result = graph.invoke(initial_input, config)
  39. # 模拟人工输入(实际应用中来自用户界面)
  40. human_decision = {
  41. "decision": "同意",
  42. "feedback": "用户反馈"
  43. }
  44. # 重新恢复工作流,继续执行后续节点
  45. resume_command = Command(resume=human_decision)
  46. final_result = graph.invoke(resume_command, config)
复制代码
(, 下载次数: 0)


注意:调用interrupt()函数后,不会阻塞,当次的invoke调用会正常结束,并将一个包含中断信息的结果返回给调用方,并执行后续的代码,等重新调用graph.invoke(resume_command, config)时,会从调用interrupt()函数的入口处重新执行(注意:如果函数的interrupt调用之前有一些接口、db访问或其他业务逻辑,则会被重复调用),且执行到interrupt()处返回的值即是用户输入的值(Command(resume=human_decision)中指的的值),具体流程如下:
(, 下载次数: 0)


2.9 记忆

记忆是智能体运行中记住先前交互信息的组件,是能够连贯对话的核心能力,LangGraph中提供了短期记忆和长期记忆。
(, 下载次数: 0)


1、短期记忆:存储当前对话上下文的信息,作用于单次会话或线程,通过thread_id(会话id)区分,通过图状态(State)和检查点(Checkpoint)实现。
  1. # 1. 初始化一个内存检查点
  2. checkpointer = InMemorySaver()
  3. # 2. 在编译图时传入检查点
  4. graph = builder.compile(checkpointer=checkpointer)
  5. # 3. 调用时通过 thread_id 指定会话线程
  6. config = {"configurable": {"thread_id": "thread_123"}}
  7. result = graph.invoke({"messages": [{"role": "user", "content": "你好"}]}, config=config)
复制代码
2、长期记忆:长期记忆用于存储那些需要在不同会话间保留的信息。它通过 存储库(Store) 接口实现,类似于一个键值数据库,并支持基于向量嵌入的语义检索。与线程范围的短期记忆不同,长期记忆保存在自定义的“命名空间”中。
  1. def write_node(state: dict) -> dict:
  2. # 获取全局存储实例
  3. store = get_store()
  4. # 存储数据到指定命名空间
  5. store.put(namespace, "user_123", {"name": "张三", "age": "20"})
  6. return {}
  7. def read_node(state: dict) -> dict:
  8. # 获取全局存储实例
  9. store = get_store()
  10. # 根据键获取指定用户数据
  11. user_info = store.get(namespace, "user_123")
  12. print(user_info)
  13. # 在命名空间中搜索包含"张三"的数据
  14. # query: 搜索关键词
  15. # limit: 返回结果的最大数量限制
  16. user_info = store.search(namespace, query="张三", limit=10)
  17. print(user_info)
  18. return {}
  19. # 初始化一个内存存储
  20. store = InMemoryStore()
  21. # 定义命名空间,命名空间元组,用于数据分类和隔离
  22. namespace = ("users", "profile")
  23. # 创建图
  24. graph = StateGraph(dict)
  25. graph.add_node("write_node", write_node)
  26. graph.add_node("read_node", read_node)
  27. graph.add_edge(START, "write_node")
  28. graph.add_edge("write_node", "read_node")
  29. graph.add_edge("read_node", END)
  30. # 编译图,并指定存储
  31. app = graph.compile(store=store)
  32. app.invoke({})
复制代码
(, 下载次数: 0)


2.10 子图

在LangGraph中允许将一个完整的图作为另一个图的节点,适用于将复杂的任务拆解为多个专业智能体协同完成,每个子图都可以独立开发、测试并且可以复用。每个子图都可以拥有自己的私有数据,也可以与父图共享数据。
(, 下载次数: 0)

  1. # 定义父图状态
  2. class ParentState(TypedDict):
  3. parent_messages: list  # 与子图共享数据
  4. # 定义子图状态
  5. class SubgraphState(TypedDict):
  6. parent_messages: list  # 与父图共享的数据
  7. sub_message: str  # 子图私有数据
  8. # 创建子图,添加node、edge等
  9. sub_builder = StateGraph(SubgraphState)
  10. sub_builder.add_node("sub_node", subgraph_node)
  11. sub_builder.add_edge(START, "sub_node")
  12. compiled_subgraph = sub_builder.compile()
  13. # 创建父图
  14. builder = StateGraph(ParentState)
  15. # 添加子图添加为父图的节点
  16. builder.add_node("subgraph_node", compiled_subgraph)
  17. # 将子图连接起来
  18. builder.add_edge("parent_node", "subgraph_node")
  19. # 编译父图并执行
  20. parent_graph = builder.compile()
  21. parent_graph.invoke({"messages": ["init message"]})
复制代码
这里共享数据指的是如果父图状态与子图状态定义名一样,则状态是共享的 。
如果当父子图状态结构不同时,需要在父图中创建一个专门的节点函数,手动调用图并处理状态数据。
  1. # 在父图中创建代理节点处理状态转换
  2. def call_subgraph(state: ParentState):
  3. # 将父图状态转换为子图的输入
  4. subgraph_input = {"analysis_input": state["user_query"]}
  5. # 调用子图
  6. subgraph_response = compiled_subgraph.invoke(subgraph_input)
  7. # 将子图的输出映射回父图状态
  8. return {"final_answer": subgraph_response["analysis_result"]}
  9. builder = StateGraph(ParentState)
  10. # 父图中添加的是代理节点,而不是直接添加子图
  11. builder.add_node("call_subgraph_node", call_subgraph)
  12. builder.add_edge(START, "call_subgraph_node")
  13. parent_graph = builder.compile()
复制代码
2.11 集成MCP

模型上下文协议(MCP)是一个开放协议,它标准化了应用程序如何向大模型提供工具和上下文。LangGraph中Agent可以通过 langchain-mcp-adapters 库使用在 MCP 服务器上定义的工具。
  1. # 安装
  2. pip install langchain-mcp-adapters
复制代码
(, 下载次数: 0)


(, 下载次数: 0)


1、自定义MCP工具
  1. # 创建名为"MCP_Tools"的MCP服务器
  2. mcp = FastMCP("MCP_Tools")
  3. @mcp.tool()
  4. def get_weather(location: str) -> str:
  5. """获取指定位置的天气信息"""
  6. return "晴天"
  7. @mcp.tool()
  8. def get_time() -> str:
  9. """获取当前时间"""
  10. return datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  11. @mcp.tool()
  12. def add(a: int, b: int) -> int:
  13. """对两个整数相加"""
  14. return a + b
  15. @mcp.tool()
  16. def multiply(a: int, b: int) -> int:
  17. """对两个整数相乘"""
  18. return a * b
  19. @mcp.tool()
  20. def subtract(a: int, b: int) -> int:
  21. """对两个整数相减"""
  22. return a - b
  23. if __name__ == "__main__":
  24. # 使用HTTP协议传输
  25. mcp.run(transport="streamable-http")
复制代码
2、创建ReAct类型的Agent
  1. async def get_agent():
  2. # 初始化MCP客户端,可以连接多个服务器
  3. client = MultiServerMCPClient(
  4. {
  5. "weather": {
  6. "url": "http://localhost:8000/mcp",
  7. "transport": "streamable_http",
  8. }
  9. }
  10. )
  11. # 获取所有可用的工具
  12. tools = await client.get_tools()
  13. print(f"已加载工具: {[tool.name for tool in tools]}")
  14. # 初始化聊天模型
  15. model = ChatOpenAI(
  16. model_name=model_name,
  17. base_url=base_url,
  18. api_key=api_key
  19. )
  20. # 创建React智能体
  21. return create_react_agent(model, tools)
复制代码
3、构建测试用例
  1. async def test_agent():
  2. # 获取智能体
  3. agent = await get_agent()
  4. # 测试用例
  5. test_cases = [
  6. "计算 (15 + 7) × 3 等于多少?",
  7. "先计算 20 减 8,然后乘以 2 是多少?",
  8. "现在几点了?深圳的天气如何?"
  9. ]
  10. for i, question in enumerate(test_cases, 1):
  11. print(f"\n{'=' * 50}")
  12. print(f"测试 {i}: {question}")
  13. print(f"{'=' * 50}")
  14. # 调用智能体
  15. response = await agent.ainvoke(
  16. {"messages": [HumanMessage(content=question)]}
  17. )
  18. # 获取最后一条消息(智能体的回复)
  19. last_message = response["messages"][-1]
  20. print(f"智能体回复: {last_message.content}")
  21. if __name__ == "__main__":
  22. asyncio.run(test_agent())
复制代码
运行结果:
(, 下载次数: 0)


2.12 运行时上下文

创建图时,可以指定运行时上下文,将上下文信息(不属于图状态的信息)传递给节点,以便节点中使用,例如模型名称或数据库连接等。
  1. @dataclass
  2. class ContextSchema:    # 定义上下文schema
  3. llm_provider: str = "openai"
  4. def node_a(state: State, runtime: Runtime[ContextSchema]):
  5. # 获取上下文信息
  6. llm = get_llm(runtime.context.llm_provider)
  7. return state
  8. graph = StateGraph(State, context_schema=ContextSchema)
  9. # 执行时指定上下文信息
  10. graph.invoke(inputs, context={"llm_provider": "DeepSeek-R1-Online-0120"})
复制代码
2.13 递归限制

递归限制指的是图在单次执行过程中的最大次数,由 recursion_limit 参数控制,默认值为25步,一旦超过限制,会抛出 GraphRecursionError错误,用于防止工作流陷入死循环,确保系统的稳定性和可预测性。
  1. try:
  2. result = graph.invoke(
  3. {"input": "开始执行"},
  4. config={"recursion_limit": 50}  # 设置递归限制为50次
  5. )
  6. except GraphRecursionError:
  7. print("执行步数超过限制,抛出异常")
  8. # 异常处理...
复制代码
3、Multi-Agent架构

3.1 多智能体架构概述

对于普通的业务系统,随着需求的迭代,系统的复杂度会变得越来越高,使得维护性和扩展性变得越来越高,经常需要花费大量是时间去定位问题,因此在项目初始阶段架构选择很重要。单智能体应用也是如此,比如:
为了解决这些问题,可以将应用程序分解为多个更小、独立的智能体,并将它们组合成一个多智能体系统。使用多智能体的好处是:
多智能体架构:
(, 下载次数: 0)


在多智能体系统中,有几种常见的连接智能体的方式:
3.2 Agent之间通信和状态管理

在构建多智能体应用时,需要考虑智能体与智能体之间如何进行交互,以及数据应该如何共享。
(, 下载次数: 0)


1、通信模式:常见的两种通讯模式是通过移交(handoffs)和工具调用
移交更适用于自主协作的场景,而工具调用则提供了更明确的层级控制和接口约束。
2、消息传递:Agent与Agent之间应该传递所有的消息还是部分消息,需要根据具体的业务场景权衡。
(, 下载次数: 0)


3.3 supervisor(主管)

每个子智能体由一个中央主管智能体协调。主管控制所有的通信流和任务委派,根据当前上下文和任务需求决定调用哪个智能体。
Supervisor 架构模仿了企业中“项目经理”的角色。它采用经典的“管理者-工作者”结构,由一个中心的主管代理(Supervisor)负责接收用户任务,并将其分解、委派给各个专业的工作者代理(Worker Agents),并最终整合结果。
(, 下载次数: 0)


LangGraph提供了专门的Supervisor Python库:
  1. pip install langgraph-supervisor
复制代码
(, 下载次数: 0)

  1. def book_hotel(hotel_name: str):
  2. """预订酒店"""
  3. print(f"✅ 成功预订了 {hotel_name} 的住宿")
  4. return f"成功预订了 {hotel_name} 的住宿。"
  5. def book_flight(from_airport: str, to_airport: str):
  6. """预订航班"""
  7. print(f"✅ 成功预订了从 {from_airport} 到 {to_airport} 的航班")
  8. return f"成功预订了从 {from_airport} 到 {to_airport} 的航班。"
  9. flight_assistant = create_react_agent(
  10. model=model,
  11. tools=[book_flight],
  12. prompt=(
  13. "你是专业的航班预订助手,专注于帮助用户预订机票。\n"
  14. "工作流程:\n"
  15. "1. 从用户需求中提取出发地和目的地信息\n"
  16. "2. 调用book_flight工具完成预订\n"
  17. "3. 收到预订成功的确认后,向主管汇报结果并结束\n"
  18. "注意:每次只处理一个预订请求,完成后立即结束,不要重复调用工具。"
  19. ),
  20. name="flight_assistant"
  21. )
  22. hotel_assistant = create_react_agent(
  23. model=model,
  24. tools=[book_hotel],
  25. prompt=(
  26. "你是专业的酒店预订助手,专注于帮助用户预订酒店。\n"
  27. "工作流程:\n"
  28. "1. 从用户需求中提取酒店信息(如果未指定,选择经济型酒店)\n"
  29. "2. 调用book_hotel工具完成预订\n"
  30. "3. 收到预订成功的确认后,向主管汇报结果并结束\n"
  31. "注意:每次只处理一个预订请求,完成后立即结束,不要重复调用工具。"
  32. ),
  33. name="hotel_assistant"
  34. )
  35. supervisor = create_supervisor(
  36. agents=[flight_assistant, hotel_assistant],
  37. model=model,
  38. prompt=(
  39. "你是一个智能任务调度主管,负责协调航班预订助手(flight_assistant)和酒店预订助手(hotel_assistant)。\n\n"
  40. "工作流程:\n"
  41. "1. 分析用户需求,确定需要哪些服务(航班、酒店或两者)\n"
  42. "2. 如果需要预订航班,调用flight_assistant一次\n"
  43. "3. 如果需要预订酒店,调用hotel_assistant一次\n"
  44. "4. 收到助手的预订确认后,记录结果\n"
  45. "5. 当所有任务都完成后,向用户汇总所有预订结果,然后立即结束\n\n"
  46. "关键规则:\n"
  47. "- 每个助手只能调用一次,不要重复调用\n"
  48. "- 看到'成功预订'的消息后,该任务就已完成\n"
  49. "- 所有任务完成后,必须直接结束,不要再调用任何助手\n"
  50. "- 如果已经看到航班和酒店的预订确认,立即汇总并结束"
  51. )
  52. ).compile()
  53. for chunk in supervisor.stream(
  54. {
  55. "messages": [
  56. {
  57. "role": "user",
  58. "content": "帮我预定一个北京到深圳的机票,并且预定一个酒店"
  59. }
  60. ]
  61. }
  62. ):
  63. print(chunk)
  64. print("\n")
复制代码
(, 下载次数: 0)


supervisor支持可以将每个工作Agent的全部消息或最后一条消息添加到整个消息列表中
  1. # 添加所有消息
  2. workflow = create_supervisor(
  3. agents=[agent1, agent2],
  4. output_mode="full_history"
  5. )
  6. # 添加最后一条消息
  7. workflow = create_supervisor(
  8. agents=[agent1, agent2],
  9. output_mode="last_message"
  10. )
复制代码
(, 下载次数: 0)


每一个主管Agent也可以是一个工作Agent,由一个更顶层的主管Agent管理:
  1. research_team = create_supervisor(
  2. [research_agent, math_agent],
  3. model=model,
  4. supervisor_name="research_supervisor"
  5. ).compile(name="research_team")
  6. writing_team = create_supervisor(
  7. [writing_agent, publishing_agent],
  8. model=model,
  9. supervisor_name="writing_supervisor"
  10. ).compile(name="writing_team")
  11. top_level_supervisor = create_supervisor(
  12. [research_team, writing_team],
  13. model=model,
  14. supervisor_name="top_level_supervisor"
  15. ).compile(name="top_level_supervisor")
复制代码
supervisor添加长期记忆和短期记忆:
  1. # 短期记忆
  2. checkpointer = InMemorySaver()
  3. # 长期记忆
  4. store = InMemoryStore()
  5. swarm = create_supervisor(
  6. agents=[flight_assistant, hotel_assistant],
  7. model=model,
  8. ).compile(checkpointer=checkpointer, store=store)
复制代码
3.4 swarm(群组)

智能体根据各自的专长动态地将控制权移交给其他智能体。Swarm 架构则更像一个开放的“专家社区”。它没有中心指挥,每个专业智能体都具备自主判断能力,可以根据当前任务上下文,决定是否以及将控制权“移交”给另一个智能体,形成一种自然的协作流水线。
(, 下载次数: 0)


安装Swarm库:Swarm库是一种多智能体架构的Python库
  1. pip install langgraph-swarm
复制代码
(, 下载次数: 0)

  1. def book_hotel(hotel_name: str):
  2. """预订酒店"""
  3. print(f"✅ 成功预订了 {hotel_name} 的住宿")
  4. return f"成功预订了 {hotel_name} 的住宿。"
  5. def book_flight(from_airport: str, to_airport: str):
  6. """预订航班"""
  7. print(f"✅ 成功预订了从 {from_airport} 到 {to_airport} 的航班")
  8. return f"成功预订了从 {from_airport} 到 {to_airport} 的航班。"
  9. transfer_to_hotel_assistant = create_handoff_tool(
  10. agent_name="hotel_assistant",
  11. description="将用户转接给酒店预订助手。当用户需要预订酒店时使用此工具。",
  12. )
  13. transfer_to_flight_assistant = create_handoff_tool(
  14. agent_name="flight_assistant",
  15. description="将用户转接给航班预订助手。当用户需要预订航班时使用此工具。",
  16. )
  17. flight_assistant = create_react_agent(
  18. model=flight_assistant_model,
  19. tools=[book_flight, transfer_to_hotel_assistant],
  20. prompt=(
  21. "你是一个航班预订助手,负责帮助用户预订航班。"
  22. "当用户需要预订航班时,使用 book_flight 工具完成预订。"
  23. "如果用户还需要预订酒店,完成航班预订后,必须使用 transfer_to_hotel_assistant 工具将用户转接给酒店预订助手。"
  24. "重要:不要直接结束对话,确保所有用户需求都得到处理。"
  25. ),
  26. name="flight_assistant"
  27. )
  28. hotel_assistant = create_react_agent(
  29. model=hotel_assistant_model,
  30. tools=[book_hotel, transfer_to_flight_assistant],
  31. prompt=(
  32. "你是一个酒店预订助手,负责帮助用户预订酒店。"
  33. "当用户需要预订酒店时,使用 book_hotel 工具完成预订。"
  34. "如果用户还需要预订航班,完成酒店预订后,必须使用 transfer_to_flight_assistant 工具将用户转接给航班预订助手。"
  35. "完成所有预订后,向用户确认所有任务已完成。"
  36. ),
  37. name="hotel_assistant"
  38. )
  39. swarm = create_swarm(
  40. agents=[flight_assistant, hotel_assistant],
  41. default_active_agent="flight_assistant"
  42. ).compile()
  43. for chunk in swarm.stream(
  44. {
  45. "messages": [
  46. HumanMessage(content="帮我预订从北京到上海的航班,并预订如家酒店")
  47. ]
  48. }
  49. ):
  50. print(chunk)
  51. print("\n")
复制代码
(, 下载次数: 0)


swarm支持添加长期记忆和短期记忆。
  1. # 短期记忆
  2. checkpointer = InMemorySaver()
  3. # 长期记忆
  4. store = InMemoryStore()
  5. swarm = create_swarm(
  6. agents=[flight_assistant, hotel_assistant],
  7. default_active_agent="flight_assistant"
  8. ).compile(checkpointer=checkpointer, store=store)
复制代码
Supervisor 和 Swarm 代表了两种截然不同但同样强大的协作思想。Supervisor 通过集中控制带来可预测性和可靠性,而 Swarm 通过去中心化设计带来灵活性和韧性。在实际应用中,架构选择没有绝对的优劣,关键在于与业务场景的深度契合。甚至,在复杂的系统中,可以混合使用两种模式,例如核心流程用 Supervisor 严格管控,非核心探索环节用 Swarm 激发灵活性。
3.5 handoffs(交接)

handoffs 指的是一个智能体将控制权交接给另一个智能体,上述的Supervisor 和 Swarm都是使用handoffs来交接执行权的。handoffs需要包含两个最基本的要素:
Supervisor 和 Swarm都默认使用了create_handoff_tool移交工具,我们也可以自己实现交接函数
  1. def create_task_description_handoff_tool(*, agent_name: str, description: str | None = None):
  2. name = f"transfer_to_{agent_name}"
  3. description = description or f"移交给 {agent_name}"
  4. @tool(name, description=description)
  5. def handoff_tool(
  6. task_description: Annotated[str, "描述下一个Agent应该做什么,包括所有相关信息。"],
  7. state: Annotated[MessagesState, InjectedState],
  8. ) -> Command:
  9. task_description_message = {"role": "user", "content": task_description}
  10. agent_input = {**state, "messages": [task_description_message]}
  11. return Command(
  12. goto=[Send(agent_name, agent_input)],
  13. graph=Command.PARENT,
  14. )
  15. return handoff_tool
  16. # 自定义移交工具
  17. transfer_to_hotel_assistant = create_task_description_handoff_tool(
  18. agent_name="hotel_assistant",
  19. description="将执行权移交给酒店预订助手",
  20. )
  21. transfer_to_flight_assistant = create_task_description_handoff_tool(
  22. agent_name="flight_assistant",
  23. description="将执行权移交给航班预订助手",
  24. )
  25. @tool("book_hotel")
  26. def book_hotel(hotel_name: str):
  27. """预订酒店 - 当用户需要预订酒店时必须调用此工具"""
  28. print(f"✅ 成功预订了 {hotel_name} 的住宿")
  29. return f"成功预订了 {hotel_name} 的住宿。"
  30. @tool("book_flight")
  31. def book_flight(from_airport: str, to_airport: str):
  32. """预订航班"""
  33. print(f"✅ 成功预订了从 {from_airport} 到 {to_airport} 的航班")
  34. return f"成功预订了从 {from_airport} 到 {to_airport} 的航班。"
  35. # 定义智能体
  36. flight_assistant = create_react_agent(
  37. model=model,
  38. tools=[book_flight, transfer_to_hotel_assistant],
  39. prompt="你是一个航班预订助手,专门负责帮助用户预订航班。",
  40. name="flight_assistant"
  41. )
  42. hotel_assistant = create_react_agent(
  43. model=model,
  44. tools=[book_hotel, transfer_to_flight_assistant],
  45. prompt="你是酒店预订助手,专门负责帮助用户预订酒店。",
  46. name="hotel_assistant"
  47. )
  48. # 定义多智能体图
  49. multi_agent_graph = (
  50. StateGraph(MessagesState)
  51. .add_node(flight_assistant)
  52. .add_node(hotel_assistant)
  53. .add_edge(START, "flight_assistant")
  54. .compile()
  55. )
  56. multi_agent_graph.invoke(
  57. {
  58. "messages": [
  59. HumanMessage(content="帮我预订从北京到上海的航班,并预订如家酒店")
  60. ]
  61. }
  62. )
复制代码
(, 下载次数: 0)


上述例子(supervisor、swarm、handoffs)在实际测试中运行并不稳定,有时并非按照预期执行相应的工具,或者循环执行工具。可以通过更换模型或者修改提示词尝试解决。
4、JAVA版本介绍(LangChain4J和LangGraph4J)

LangGraph除了python 和 js版本外,还提供了Java版本,如果需要开发复杂的业务系统或者团队使用的技术栈为Java,则LangGraph4j是一个不错的选择。我们团队的项目使用的是Java技术栈,所以这里顺便介绍一下使用LangChain4J+LangGraph4J快速的将AI大模型引入到Java项目中。
由于Spring AI有Spring Boot 3.x + JDK 21的限制,而LangGraph4j是一个独立的库,不依赖Sping Boot,而且使用JDK17,引入成本更低。
本章主要讲LangGraph4j如何使用,具体相关的概念与Python的一样,可参考上文。
4.1 环境准备

Maven依赖:
  1. <!-- LangChain4j -->
  2. <dependency>
  3. <groupId>dev.langchain4j</groupId>
  4. <artifactId>langchain4j</artifactId>
  5. <version>1.6.0</version>
  6. </dependency>
  7. <!-- LangChain4j OpenAI -->
  8. <dependency>
  9. <groupId>dev.langchain4j</groupId>
  10. <artifactId>langchain4j-open-ai</artifactId>
  11. <version>1.2.0</version>
  12. </dependency>
  13. <!-- LangGraph4J -->
  14. <dependency>
  15. <groupId>org.bsc.langgraph4j</groupId>
  16. <artifactId>langgraph4j-core</artifactId>
  17. <version>1.5.2</version>
  18. </dependency>
复制代码
4.2 使用LangChain4J集成大模型

1、调用大模型
  1. public static void main(String[] args) {
  2. // 构建聊天模型实例
  3. ChatModel chatModel = OpenAiChatModel.builder()
  4. .baseUrl(BASE_URL)              // 设置 API 基础地址
  5. .apiKey(API_KEY)                // 设置 API 密钥
  6. .modelName("hunyuan-turbo")     // 指定模型名称
  7. .timeout(Duration.ofSeconds(60)) // 设置请求超时时间为 60 秒
  8. .logRequests(true)              // 开启请求日志,便于调试
  9. .logResponses(true)             // 开启响应日志,便于调试
  10. .maxRetries(3)                  // 设置最大重试次数为 3 次
  11. .temperature(0.8)               // 设置温度参数(0.0-1.0),控制输出的随机性
  12. .returnThinking(true)           // 返回模型的思考过程(针对深思考模型)
  13. .build();
  14. // 创建系统消息,定义 AI 助手的角色和行为
  15. SystemMessage systemMessage = SystemMessage.from("你是一个LangChain和LangGraph专家,用于解答开发者的问题。");
  16. // 创建用户消息,包含具体的问题
  17. UserMessage userMessage = UserMessage.from("介绍一下LangGraph");
  18. // 模型调用
  19. ChatResponse chatResponse = chatModel.chat(systemMessage, userMessage);
  20. // 从响应中提取 AI 消息
  21. AiMessage aiMessage = chatResponse.aiMessage();
  22. // 输出模型的思考过程(如果模型支持并返回)
  23. System.out.println(aiMessage.thinking());
  24. // 输出模型的最终回答
  25. System.out.println(aiMessage.text());
  26. }
复制代码
2、提示词模版
  1. # 创建提示词模版
  2. PromptTemplate promptTemplate = PromptTemplate.from("你是一个{{domain}}领域的专家,用于解答关于{{question}}的开发者问题。");
  3. # 填充参数
  4. String prompt = promptTemplate.apply(Map.of(
  5. "domain", "LangChain和LangGraph",
  6. "question", "LangGraph"
  7. )).text();
  8. chatModel.chat(prompt);
复制代码
3、AI Service
AI Service 是 LangChain4j 框架中一个高级的、声明式的 API,能够像定义普通 Java Service 接口一样来定义 AI 功能,从而极大地简化与大模型的集成。
  1. // 定义一个反洗钱助手接口
  2. interface RiskAssistant {
  3. @SystemMessage("你是一个专注于反洗钱业务的专家助手")
  4. @UserMessage("请回答用户关于反洗钱的提问,问题:{{question}}")
  5. String answer(@V("question") String question);
  6. }
  7. public static void main(String[] args) {
  8. // 构建聊天模型实例
  9. ChatModel chatModel = OpenAiChatModel.builder().baseUrl(BASE_URL)
  10. .apiKey(API_KEY)
  11. .modelName("hunyuan-turbo")
  12. .build();
  13. // 通过 AiServices 创建实例
  14. RiskAssistant riskAssistant = AiServices.create(RiskAssistant.class, chatModel);
  15. String answer = riskAssistant.answer("什么是EDD?");
  16. System.out.println(answer);
  17. }
复制代码
4、添加记忆
  1. RiskAssistant riskAssistant = AiServices.builder(RiskAssistant.class)
  2. .chatModel(chatModel)
  3. // 添加记忆能力,保存用户最近 10 条对话,也可以自定义记忆能力
  4. .chatMemory(MessageWindowChatMemory.withMaxMessages(10))
  5. .build();
复制代码
5、使用工具
  1. public static class StockTools {
  2. @Tool("查询公司股价")
  3. public String getStockPrice(@P("公司名称") String company) {
  4. return "1000";
  5. }
  6. }
  7. public static void main(String[] args) {
  8. StockAssistant assistant = AiServices.builder(StockAssistant.class)
  9. .chatModel(chatModel)
  10. // 添加工具
  11. .tools(new StockTools())
  12. .build();
  13. }
复制代码
6、Guardrail(防护机制)
通过预设的规则来验证和过滤模型的输入与输出,确保交互过程的安全、可靠和合规。分为输入Guardrail 和 输出Guardrail
  1. // 输出 Guardrail
  2. public static class SensitiveInputGuardrail implements InputGuardrail {
  3. private static final Set<String> SENSITIVE_WORDS = Set.of("作弊", "开挂", "攻击");
  4. @Override
  5. public InputGuardrailResult validate(UserMessage userMessage) {
  6. String userInput = userMessage.singleText();
  7. for (String word : SENSITIVE_WORDS) {
  8. if (userInput.contains(word)) {
  9. // 发现敏感词,立即终止请求
  10. return fatal("您的请求包含违规内容,已被拦截。");
  11. }
  12. }
  13. // 输入合法,放行
  14. return InputGuardrailResult.success();
  15. }
  16. }
  17. // 输出 Guardrail
  18. public static class ContentSafetyOutputGuardrail implements OutputGuardrail {
  19. private static final Set<String> SENSITIVE_WORDS = Set.of("作弊", "开挂", "攻击");
  20. @Override
  21. public OutputGuardrailResult validate(AiMessage aiMessage) {
  22. String aiResponse = aiMessage.text();
  23. // 判断输出内容是否合法,自定义函数
  24. if (isSensitiveContent(aiResponse)) {
  25. // 策略1:直接拦截并报错
  26. // return failure("输出内容不合规");
  27. // 策略2:要求模型重试,给予一次修正机会
  28. return retry("请以更安全、中立的方式重新生成回答");
  29. }
  30. return OutputGuardrailResult.success();
  31. }
  32. }
  33. // 使用注解将Guardrail应用于整个AI服务
  34. @InputGuardrails(SensitiveInputGuardrail.class)
  35. @OutputGuardrails(ContentSafetyOutputGuardrail.class)
  36. public interface MyAIService {
  37. String chat(String userMessage);
  38. }
  39. public static void main(String[] args) {
  40. ChatModel chatModel = OpenAiChatModel.builder()
  41. .baseUrl(BASE_URL)
  42. .apiKey(API_KEY)
  43. .modelName("hunyuan-turbo")
  44. .build();
  45. MyAIService myAI = AiServices.builder(MyAIService.class)
  46. .chatModel(chatModel)
  47. // 使用注解或构造器的方式指定Guardrail
  48. // .inputGuardrails(new SensitiveInputGuardrail())
  49. // .outputGuardrails(new ContentSafetyOutputGuardrail())
  50. .build();
  51. }
复制代码
7、多模态
  1. ChatModel chatModel = OpenAiChatModel.builder()
  2. .baseUrl(BASE_URL)
  3. .apiKey(API_KEY)
  4. .modelName("hunyuan-ocr")          // 设置模型名称,需支持多模态
  5. .build();
  6. byte[] imageBytes = Files.readAllBytes(Paths.get(IMAGE_PATH));
  7. String base64ImageData = Base64.getEncoder().encodeToString(imageBytes);
  8. // 创建包含文本和图片内容的用户消息
  9. // 使用 TextContent 和 ImageContent 组合构建多模态消息
  10. UserMessage userMessage = UserMessage.from(
  11. TextContent.from("描述图片的内容"),
  12. ImageContent.from(base64ImageData, "image/png"));
  13. ChatResponse chat = chatModel.chat(userMessage);
复制代码
4.3 使用LangGraph4J构建工作流

1、创建图(Node、Edge、State)
  1. public static void main(String[] args) throws GraphStateException {
  2. StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
  3. // 添加节点,node_async表示同步执行
  4. graph.addNode("input_node", AsyncNodeAction.node_async(state -> {
  5. System.out.println("[input_node] 接收到状态: " + state.data());
  6. // 返回要更新的数据,默认规则与上一个节点的数据合并
  7. return Map.of("input_node", "input_node");
  8. }));
  9. graph.addNode("process_node", AsyncNodeAction.node_async(state -> {
  10. System.out.println("[process_node] 接收到状态: " + state.data());
  11. return Map.of("process_node", "process_node");
  12. }));
  13. // 添加边, START -> input_node -> process_node -> END
  14. graph.addEdge(StateGraph.START, "input_node");
  15. graph.addEdge("input_node", "process_node");
  16. graph.addEdge("process_node", StateGraph.END);
  17. // 编译图
  18. CompiledGraph<AgentState> compile = graph.compile();
  19. // 初始状态
  20. Map<String, Object> initialData = new HashMap<>();
  21. initialData.put("init_data", "init_data");
  22. // 执行图
  23. Optional<AgentState> invoke = compile.invoke(initialData);
  24. invoke.ifPresent(state -> System.out.println("最终状态: " + state.data()));
  25. }
复制代码
2、状态合并策略(Channels,类似与python的Reducer)
  1. public static void main(String[] args) throws GraphStateException {
  2. // 定义Channels,指定每个状态字段的合并策略
  3. Map<String, Channel<?>> channels = new LinkedHashMap<>();
  4. // 集合追加
  5. channels.put("messages", Channels.appender(ArrayList::new));
  6. // 返回两数之和
  7. channels.put("counter", Channels.base(Integer::sum, () -> 0));
  8. // 返回最大值
  9. channels.put("max_score", Channels.base(Math::max, () -> 0));
  10. // 创建图,并指定状态字段合并策略
  11. StateGraph<AgentState> graph = new StateGraph<>(channels, AgentState::new);
  12. // 添加节点
  13. graph.addNode("node1", AsyncNodeAction.node_async(state -> {
  14. System.out.println("node1 -> " + state);
  15. return Map.of("messages", "node1", "counter", 3, "max_score", 85, "current_step", "node2");
  16. }));
  17. graph.addNode("node2", AsyncNodeAction.node_async(state -> {
  18. System.out.println("node2 -> " + state);
  19. return Map.of("messages", "node2", "counter", 5, "max_score", 72, "current_step", "node2");
  20. }));
  21. graph.addNode("node3", AsyncNodeAction.node_async(state -> {
  22. System.out.println("node3 -> " + state);
  23. return Map.of("messages", "node3", "counter", 2, "max_score", 95, "current_step", "node3");
  24. }));
  25. // 添加边
  26. graph.addEdge(StateGraph.START, "node1");
  27. graph.addEdge("node1", "node2");
  28. graph.addEdge("node2", "node3");
  29. graph.addEdge("node3", StateGraph.END);
  30. // 编译并执行
  31. CompiledGraph<AgentState> compile = graph.compile();
  32. compile.invoke(new HashMap<>()).ifPresent(state -> {
  33. System.out.println("final state: " + state);
  34. });
  35. }
复制代码
3、条件边
  1. public static void main(String[] args) throws GraphStateException {
  2. StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
  3. // 添加节点、其他边...
  4. // 定义条件映射关系,key为条件,value为目标节点名
  5. Map<String, String> mappings = new HashMap<>();
  6. mappings.put("pass", "pass_handler");
  7. mappings.put("fail", "fail_handler");
  8. graph.addConditionalEdges("node", agentState -> {
  9. // 自定义路由条件...
  10. // 通过State中获取条件,然后判断需要路由的下一个节点
  11. int score = (Integer) agentState.value("score").orElse(0);
  12. return CompletableFuture.completedFuture(score >= 90 ? "pass" : "fail");
  13. }, mappings);
  14. }
复制代码
4、检查点(Checkpoint)
  1. public static void main(String[] args) throws GraphStateException {
  2. // 定义检查点保存器
  3. MemorySaver checkpoint = new MemorySaver();
  4. CompileConfig config = CompileConfig.builder()
  5. .checkpointSaver(checkpoint)
  6. .build();
  7. StateGraph<AgentState> graph = new StateGraph<>( AgentState::new);
  8. // 添加节点、边...
  9. // 编译图时指定检查点保存器
  10. CompiledGraph<AgentState> compile = graph.compile(config);
  11. compile.invoke(new HashMap<>());
  12. }
复制代码
5、人机协作(Human-in-the-Loop)
  1. public static void main(String[] args) throws Exception {
  2. MemorySaver checkpointer = new MemorySaver();
  3. StateGraph<AgentState> graph = new StateGraph<>(AgentState::new);
  4. // 节点1: 接收用户问题
  5. graph.addNode("receive_question", AsyncNodeAction.node_async(state -> {
  6. // 业务逻辑...
  7. return Map.of("status", "received", "timestamp", System.currentTimeMillis());
  8. }));
  9. // 节点2: AI尝试回答
  10. graph.addNode("ai_answer", AsyncNodeAction.node_async(state -> {
  11. // 业务逻辑...
  12. return Map.of("status", "ai_answered", "ai_response", "转人工", "confidence", 40);
  13. }));
  14. // 节点3: 人工介入
  15. graph.addNode("human_agent", AsyncNodeAction.node_async(state -> {
  16. // 业务逻辑...
  17. return Map.of("human_response", "人工回复...", "handled_by", "human", "status", "human_handled");
  18. }));
  19. // 节点4: 完成并汇总
  20. graph.addNode("complete", AsyncNodeAction.node_async(state -> {
  21. // 业务逻辑...
  22. return Map.of("status", "completed", "completion_time", System.currentTimeMillis());
  23. }));
  24. // 添加边
  25. graph.addEdge(StateGraph.START, "receive_question");
  26. graph.addEdge("receive_question", "ai_answer");
  27. // 条件边:根据AI置信度决定是否需要人工
  28. graph.addConditionalEdges(
  29. "ai_answer",
  30. state -> CompletableFuture.completedFuture(
  31. (Integer) state.value("confidence").orElse(0) < 60 ? "human" : "complete"
  32. ),
  33. Map.of("human", "human_agent", "complete", "complete")
  34. );
  35. graph.addEdge("human_agent", "complete");
  36. graph.addEdge("complete", StateGraph.END);
  37. // 配置:在human_agent执行前前中断
  38. CompileConfig config = CompileConfig.builder()
  39. .checkpointSaver(checkpointer)
  40. .interruptBefore("human_agent")
  41. .build();
  42. CompiledGraph<AgentState> compile = graph.compile(config);
  43. RunnableConfig runnableConfig = RunnableConfig.builder().threadId("thread1").build();
  44. // 首次执行,执行human_agent前会中断
  45. Optional<AgentState> invoke = compile.invoke(Map.of("user_question", "如何退款?"), runnableConfig);
  46. invoke.ifPresent(state -> System.out.println("final state: " + state));
  47. // 模拟用户输入....
  48. Map<String, Object> humanInput = new HashMap<>();
  49. humanInput.put("human_response", "人工回复");
  50. humanInput.put("agent_name", "human_agent");
  51. RunnableConfig updatedConfig2 = compile.updateState(runnableConfig, humanInput);
  52. // 用户输入后再次执行,从human_agent开始执行
  53. Optional<AgentState> invoke1 = compile.invoke(null, updatedConfig2);
  54. invoke1.ifPresent(state -> System.out.println("final state: " + state));
  55. }
复制代码
除了上述例子,LangGraph4j还提供了一个标准AgentExecutor 类(也称为 ReACT Agent)用于支持人工审批工作流程。可参考: https://bsorrentino.github.io/bsorrentino/ai/2025/07/13/LangGraph4j-Agent-with-approval.html
LangChain4J 和 LangGraph4J的基本使用就介绍到这里,其核心思想与Python一样,只是换了一种调用方式而已,实际工作或学习中可以根据场景或个人爱好选择。
至此,关于LangGraph的介绍就全部结束了,LangGraph作为多智能体应用的编排框架,通过图结构、灵活的状态管理和控制流,为构建复杂的多智能体应用提供了强大基础设施。它作为LangChian生态的一部分,可以直接利用 LangChain 庞大的工具库和模型集成,无需从零开始编写所有功能,可以轻松调用各种现成的工具(如搜索引擎、数据库查询工具等)和模型,快速搭建起强大的多智能体应用。LangGraph除了Python版本还提供了JS和JAVA版本,实际开发中可以根据具体的应用场景选择合适的语言。
如何学习大模型 AI ?

由于新岗位的生产效率,要优于被取代岗位的生产效率,所以实际上整个社会的生产效率是提升的。
但是具体到个人,只能说是:
“最先掌握AI的人,将会比较晚掌握AI的人有竞争优势”。
这句话,放在计算机、互联网、移动互联网的开局时期,都是一样的道理。
我在一线互联网企业工作十余年里,指导过不少同行后辈。帮助很多人得到了学习和成长。
我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑,所以在工作繁忙的情况下还是坚持各种整理和分享。但苦于知识传播途径有限,很多互联网行业朋友无法获得正确的资料得到学习提升,故此将并将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。
这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】
(, 下载次数: 0)


(, 下载次数: 0)


为什么要学习大模型?

我国在A大模型领域面临人才短缺,数量与质量均落后于发达国家。2023年,人才缺口已超百万,凸显培养不足。随着AI技术飞速发展,预计到2025年,这一缺口将急剧扩大至400万,严重制约我国AI产业的创新步伐。加强人才培养,优化教育体系,国际合作并进是破解困局、推动AI发展的关键。
(, 下载次数: 0)


(, 下载次数: 0)


大模型入门到实战全套学习大礼包

1、大模型系统化学习路线

作为学习AI大模型技术的新手,方向至关重要。 正确的学习路线可以为你节省时间,少走弯路;方向不对,努力白费。这里我给大家准备了一份最科学最系统的学习成长路线图和学习规划,带你从零基础入门到精通!



2、大模型学习书籍&文档

学习AI大模型离不开书籍文档,我精选了一系列大模型技术的书籍和学习文档(电子版),它们由领域内的顶尖专家撰写,内容全面、深入、详尽,为你学习大模型提供坚实的理论基础。
(, 下载次数: 0)


3、AI大模型最新行业报告

2025最新行业报告,针对不同行业的现状、趋势、问题、机会等进行系统地调研和评估,以了解哪些行业更适合引入大模型的技术和应用,以及在哪些方面可以发挥大模型的优势。
(, 下载次数: 0)


4、大模型项目实战&配套源码

学以致用,在项目实战中检验和巩固你所学到的知识,同时为你找工作就业和职业发展打下坚实的基础。
(, 下载次数: 0)


5、大模型大厂面试真题

面试不仅是技术的较量,更需要充分的准备。在你已经掌握了大模型技术之后,就需要开始准备面试,我精心整理了一份大模型面试题库,涵盖当前面试中可能遇到的各种技术问题,让你在面试中游刃有余
(, 下载次数: 0)


适用人群

(, 下载次数: 0)


第一阶段(10天):初阶应用

该阶段让大家对大模型 AI有一个最前沿的认识,对大模型 AI 的理解超过 95% 的人,可以在相关讨论时发表高级、不跟风、又接地气的见解,别人只会和 AI 聊天,而你能调教 AI,并能用代码将大模型和业务衔接。
第二阶段(30天):高阶应用

该阶段我们正式进入大模型 AI 进阶实战学习,学会构造私有知识库,扩展 AI 的能力。快速开发一个完整的基于 agent 对话机器人。掌握功能最强的大模型开发框架,抓住最新的技术进展,适合 Python 和 JavaScript 程序员。
第三阶段(30天):模型训练

恭喜你,如果学到这里,你基本可以找到一份大模型 AI相关的工作,自己也能训练 GPT 了!通过微调,训练自己的垂直大模型,能独立训练开源多模态大模型,掌握更多技术方案。
到此为止,大概2个月的时间。你已经成为了一名“AI小子”。那么你还想往下探索吗?
第四阶段(20天):商业闭环

对全球大模型从性能、吞吐量、成本等方面有一定的认知,可以在云端和本地等多种环境下部署大模型,找到适合自己的项目/创业方向,做一名被 AI 武装的产品经理。
学习是一个过程,只要学习就会有挑战。天道酬勤,你越努力,就会成为越优秀的自己。
如果你能在15天内完成所有的任务,那你堪称天才。然而,如果你能完成 60-70% 的内容,你就已经开始具备成为一名大模型 AI 的正确特征了。
这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费】




原文地址:https://blog.csdn.net/python12345_/article/details/156047030




欢迎光临 AI创想 (http://llms-ai.com/) Powered by Discuz! X3.4