开启左侧

【LangGraph】使用 LangGraph 的 Pregel 运行时日志

[复制链接]
创想小编 发表于 昨天 09:22 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
作者:彬彬侠
使用 LangGraph 时,Pregel 作为其底层计算模型,驱动图的执行,而运行时日志(Runtime Logging)是调试和监控 LangGraph 工作流的重要工具。LangGraph 提供了灵活的日志机制,帮助开发者跟踪节点执行、状态更新、消息传递等信息。以下是详细介绍如何使用 LangGraph 的 Pregel 运行时日志,结合 Pregel 的执行机制,确保解答准确、清晰、全面,并以中文呈现。

1. LangGraph 和 Pregel 的运行时日志背景

LangGraph 基于 Pregel 模型运行图工作流,每个超步(Superstep)执行一组节点,更新状态,并通过边传递控制流。运行时日志可以帮助:
    调试:检查每个节点的输入、输出和状态变化。监控:跟踪工作流的执行路径、性能和错误。优化:分析超步的执行时间或状态更新开销。
LangGraph 的日志通常依赖 Python 的日志模块(logging)或 LangChain 生态的回调机制(Callbacks)。Pregel 的运行时日志记录了每个超步的节点执行、状态更新和条件跳转等信息。

2. 设置 LangGraph 运行时日志的步骤

以下是使用 LangGraph 的 Pregel 运行时日志的详细步骤,涵盖基本配置、自定义日志和高级用法。
步骤 1:配置 Python 日志模块

LangGraph 使用 Python 的 logging 模块记录运行时信息。你需要确保日志级别和输出格式正确配置。
示例代码
  1. import logging
  2. # 配置日志
  3. logging.basicConfig(
  4.     level=logging.INFO,# 设置日志级别(DEBUG、INFO、WARNING、ERROR)format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
  5.     handlers=[
  6.         logging.StreamHandler()# 输出到控制台# 可添加 logging.FileHandler("logfile.log") 输出到文件])# 获取 LangGraph 的日志器
  7. logger = logging.getLogger("langgraph")
复制代码
说明
    level=logging.DEBUG:捕获详细日志,包括每个节点的输入/输出。level=logging.INFO:捕获主要事件,如节点执行和状态更新。handlers:支持输出到控制台、文件或外部服务(如 ELK 栈)。
步骤 2:构建 LangGraph 工作流

假设你有一个简单的 LangGraph 工作流,包含状态和节点。我们将为其添加日志。
示例代码
  1. from typing import TypedDict, Annotated
  2. from langgraph.graph import StateGraph
  3. from pydantic import Field
  4. # 定义状态classState(TypedDict):
  5.     user_input: Annotated[str, Field(description="User's input")]
  6.     response: Annotated[str, Field(description="Generated response")]
  7.     step_count:int# 节点:生成响应defgenerate_response(state: State)-> State:
  8.     logger.info(f"Processing user_input: {state['user_input']}")
  9.     response =f"Echo: {state['user_input']}"
  10.     logger.debug(f"Generated response: {response}")return{"response": response,"step_count": state["step_count"]+1}# 构建图
  11. graph = StateGraph(State)
  12. graph.add_node("generate", generate_response)
  13. graph.set_entry_point("generate")
  14. graph.set_finish_point("generate")# 编译图
  15. app = graph.compile()
复制代码
说明
    logger.info 和 logger.debug 在节点中记录关键信息。State 使用 TypedDict 和 Annotated 定义,确保类型安全和元数据支持。
步骤 3:运行并查看日志

运行工作流,观察 Pregel 的运行时日志。
示例代码
  1. # 执行工作流
  2. initial_state ={"user_input":"Hello!","response":"","step_count":0}
  3. result = app.invoke(initial_state)# 打印结果print(result)
复制代码
示例输出(日志):
  1. 2025-05-14 21:21:45,123 - langgraph - INFO - Processing user_input: Hello!
  2. 2025-05-14 21:21:45,124 - langgraph - DEBUG - Generated response: Echo: Hello!
  3. {'user_input': 'Hello!', 'response': 'Echo: Hello!', 'step_count': 1}
复制代码
说明
    日志显示了节点 generate 的执行过程,包括输入和输出。Pregel 的超步隐式执行,日志记录了每个节点的计算。
步骤 4:使用 LangChain 回调机制

LangGraph 继承了 LangChain 的回调系统(Callbacks),可以通过自定义回调捕获更详细的运行时信息,如节点开始/结束、状态更新等。
示例代码
  1. from langchain_core.callbacks import BaseCallbackHandler
  2. from langgraph.graph import StateGraph
  3. # 自定义回调classLoggingCallback(BaseCallbackHandler):defon_chain_start(self, serialized, inputs,**kwargs):
  4.         logger.info(f"Starting graph execution with inputs: {inputs}")defon_chain_end(self, outputs,**kwargs):
  5.         logger.info(f"Graph execution completed with outputs: {outputs}")defon_node_start(self, serialized, inputs, node_name,**kwargs):
  6.         logger.debug(f"Node {node_name} started with inputs: {inputs}")defon_node_end(self, outputs, node_name,**kwargs):
  7.         logger.debug(f"Node {node_name} ended with outputs: {outputs}")# 构建图(同上)
  8. graph = StateGraph(State)
  9. graph.add_node("generate", generate_response)
  10. graph.set_entry_point("generate")
  11. graph.set_finish_point("generate")# 编译图并添加回调
  12. app = graph.compile(callbacks=[LoggingCallback()])# 执行
  13. result = app.invoke({"user_input":"Hello!","response":"","step_count":0})
复制代码
示例输出
  1. 2025-05-14 21:21:45,123 - langgraph - INFO - Starting graph execution with inputs: {'user_input': 'Hello!', 'response': '', 'step_count': 0}
  2. 2025-05-14 21:21:45,124 - langgraph - DEBUG - Node generate started with inputs: {'user_input': 'Hello!', 'response': '', 'step_count': 0}
  3. 2025-05-14 21:21:45,125 - langgraph - INFO - Processing user_input: Hello!
  4. 2025-05-14 21:21:45,126 - langgraph - DEBUG - Generated response: Echo: Hello!
  5. 2025-05-14 21:21 Virgen,125 - langgraph - DEBUG - Node generate ended with outputs: {'response': 'Echo: Hello!', 'step_count': 1}
  6. 2025-05-14 21:21:45,127 - langgraph - INFO - Graph execution completed with outputs: {'user_input': 'Hello!', 'response': 'Echo: Hello!', 'step_count': 1}
复制代码
说明
    LoggingCallback 捕获了图和节点的生命周期事件(开始/结束)。日志详细记录了 Pregel 的超步执行,包括每个节点的输入和输出。
步骤 5:高级日志配置(可选)

对于复杂工作流,可以进一步优化日志:

  • 输出到文件
    1. logging.basicConfig(
    2.     level=logging.DEBUG,
    3.     handlers=[
    4.         logging.FileHandler("langgraph.log"),# 保存到文件
    5.         logging.StreamHandler()# 同时输出到控制台])
    复制代码
  • 结构化日志
    使用 structlog 或 loguru 替代标准 logging,支持 JSON 格式日志,便于分析。
    1. from loguru import logger
    2. # 替换标准 logger
    3. logger.add("langgraph.log",format="{time} {level} {message}", level="DEBUG")
    复制代码
    集成外部工具
      将日志发送到监控系统(如 Prometheus、ELK 栈)。使用 logging.handlers.HTTPHandler 将日志推送到远程服务器。


3. Pregel 日志的关键信息

在 LangGraph 的 Pregel 运行时日志中,你可以捕获以下关键信息:

  • 超步信息
      每个超步(Superstep)执行的节点列表。超步的开始和结束时间(用于性能分析)。

  • 节点执行
      节点的输入状态和输出状态。节点内的日志(如 logger.info 输出)。

  • 状态更新
      状态字段的变化(如 response 从空变为 Echo: Hello!)。

  • 条件跳转
      条件边的触发情况(如“跳转到节点 X”)。

  • 错误和异常
      节点执行失败的详细信息(如 ValidationError、工具调用错误)。

  • 消息传递
      在复杂工作流中,节点间的消息或状态传递记录。

示例:捕获条件跳转日志
  1. from langgraph.graph import StateGraph, END
  2. # 定义状态classState(TypedDict):
  3.     user_input:str
  4.     response:str
  5.     step_count:int# 节点:决定是否继续defdecide_continue(state: State)->str:
  6.     logger.info(f"Deciding with step_count: {state['step_count']}")if state["step_count"]>=3:return END
  7.     return"generate"# 节点:生成响应defgenerate_response(state: State)-> State:
  8.     logger.info(f"Generating response for: {state['user_input']}")return{"response":f"Echo: {state['user_input']}","step_count": state["step_count"]+1}# 构建图
  9. graph = StateGraph(State)
  10. graph.add_node("generate", generate_response)
  11. graph.add_node("decide", decide_continue)
  12. graph.add_edge("generate","decide")
  13. graph.add_conditional_edges("decide", decide_continue)
  14. graph.set_entry_point("generate")# 编译并添加回调
  15. app = graph.compile(callbacks=[LoggingCallback()])# 执行
  16. result = app.invoke({"user_input":"Hello!","response":"","step_count":0})
复制代码
示例输出
  1. 2025-05-14 21:21:45,123 - langgraph - INFO - Starting graph execution with inputs: {'user_input': 'Hello!', 'response': '', 'step_count': 0}
  2. 2025-05-14 21:21:45,124 - langgraph - DEBUG - Node generate started with inputs: {'user_input': 'Hello!', 'response': '', 'step_count': 0}
  3. 2025-05-14 21:21:45,125 - langgraph - INFO - Generating response for: Hello!
  4. 2025-05-14 21:21:45,126 - langgraph - DEBUG - Node generate ended with outputs: {'response': 'Echo: Hello!', 'step_count': 1}
  5. 2025-05-14 21:21:45,127 - langgraph - DEBUG - Node decide started with inputs: {'user_input': 'Hello!', 'response': 'Echo: Hello!', 'step_count': 1}
  6. 2025-05-14 21:21:45,128 - langgraph - INFO - Deciding with step_count: 1
  7. 2025-05-14 21:21:45,129 - langgraph - DEBUG - Node decide ended with outputs: generate
  8. ...
  9. 2025-05-14 21:21:45,135 - langgraph - INFO - Graph execution completed with outputs: {'user_input': 'Hello!', 'response': 'Echo: Hello!', 'step_count': 3}
复制代码
说明
    日志捕获了条件跳转(decide_continue 返回 generate 或 END)。Pregel 的超步迭代直到 step_count >= 3,然后终止。

4. 调试和优化技巧

    调试节点逻辑
      在节点函数中使用 logger.debug 记录中间变量或状态。
    • 使用回调的 on_node_error 捕获节点异常:
      1. defon_node_error(self, error, node_name,**kwargs):
      2.     logger.error(f"Node {node_name} failed with error: {error}")
      复制代码
    分析性能

    • 记录每个超步的执行时间:
      1. import time
      2. defgenerate_response(state: State)-> State:
      3.     start_time = time.time()
      4.     response =f"Echo: {state['user_input']}"
      5.     logger.info(f"Node generate took {time.time()- start_time:.2f} seconds")return{"response": response,"step_count": state["step_count"]+1}
      复制代码
    过滤无关日志

    • 设置特定模块的日志级别:
      1. logging.getLogger("langgraph").setLevel(logging.DEBUG)
      2. logging.getLogger("langchain").setLevel(logging.WARNING)# 降低其他模块日志
      复制代码
    可视化日志
      使用 graph.get_graph().draw_ascii() 可视化图结构,结合日志分析执行路径。将日志导入分析工具(如 Grafana)绘制执行时间或状态变化图。


5. 常见问题解答

    如何捕获 Pregel 的超步详细信息?
      使用回调的 on_chain_start 和 on_chain_end 捕获超步边界。自定义回调记录节点间的状态传递。
    日志量过大怎么办?
      提高日志级别(如 INFO 而非 DEBUG)。使用 structlog 或 loguru 过滤无关日志。只在关键节点添加详细日志。
    如何记录工具调用日志?

    • 为工具函数添加日志:
      1. from langchain.tools import tool
      2. @tooldefsearch(query:str)->str:
      3.     logger.info(f"Tool search called with query: {query}")returnf"Results for {query}"
      复制代码
    分布式环境下的日志如何处理?
      使用分布式日志收集工具(如 ELK、Fluentd)。确保日志包含机器标识和超步编号,便于聚合分析。


6. 总结

    LangGraph 的 Pregel 日志:通过 Python logging 和 LangChain 回调机制,记录超步、节点执行、状态更新和条件跳转。
  • 配置方法
      使用 logging.basicConfig 设置日志级别和输出。自定义 BaseCallbackHandler 捕获节点和图的生命周期事件。
    关键信息:超步执行、节点输入/输出、状态变化、条件跳转、错误。优化技巧:调试节点、分析性能、过滤日志、可视化执行路径。Pregel 的作用:驱动 LangGraph 的迭代执行,日志反映了超步的计算和状态传递。

原文地址:https://blog.csdn.net/u013172930/article/details/147980148
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )