开启左侧

Langchain 工作流编排

[复制链接]
米落枫 发表于 2025-9-7 23:38:37 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
一、工作流编排核心架构

LangChain通过LCEL(LangChain Expression Language)Runnable接口构建声明式工作流,其核心原理是:
    所有组件实现Runnable接口标准化调用使用管道符|连接组件形成链(Chain)自动支持流式传输/异步/并行等特性
  1. from langchain_core.runnables import RunnablePassthrough
  2. chain = prompt | model | output_parser  # LCEL管道语法
复制代码

二、LCEL(LangChain Expression Language)

核心作用

    声明式组合:用|运算符连接Runnable组件自动类型推导:验证组件输入输出兼容性运行时优化:自动并行/批处理可加速的环节
基础用法
  1. from langchain_core.prompts import ChatPromptTemplate
  2. from langchain_openai import ChatOpenAI
  3. prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的故事")
  4. model = ChatOpenAI()
  5. chain = prompt | model |(lambda x: x.content)# 组合Prompt+LLM+输出解析
  6. chain.invoke({"topic":"龙"})# 输出完整故事文本
复制代码
高级组合
  1. # 分支与合并from langchain_core.runnables import RunnableParallel
  2. map_chain = RunnableParallel(
  3.     joke=chain,
  4.     poem=poem_chain  # 假设已定义的诗生成链)
复制代码

三、Runnable接口

统一接口方法

所有组件(模型/提示词/解析器等)都实现以下方法:
  1. classRunnable(Protocol):definvoke(self,input: Any)-> Any:...# 同步调用asyncdefainvoke(self,input: Any)-> Any:...# 异步调用defbatch(self, inputs: List)-> List:...# 批量处理defstream(self,input: Any)-> Iterator:...# 流式输出
复制代码
关键特性

    自动适配:无论输入是dict/str/list都会自动处理组合嵌套:支持任意深度的链式组合生命周期钩子:可通过事件监听进行调试
  1. # 自定义Runnablefrom langchain_core.runnables import RunnableLambda
  2. deflength_func(text:str)->int:returnlen(text)
  3. length_chain = RunnableLambda(length_func)
  4. length_chain.invoke("hello")# 输出5
复制代码

四、Stream流式传输

实现原理

    逐步生成输出而非等待全部完成每个处理环节均支持流式(模型/转换器等)
同步流式
  1. for chunk in chain.stream({"topic":"太空"}):print(chunk, end="", flush=True)# 逐词打印生成的文本
复制代码
异步流式
  1. asyncfor chunk in chain.astream({"topic":"火星"}):print(chunk, end="", flush=True)
复制代码

五、Stream Events事件驱动

事件类型

通过stream_events方法捕获完整生命周期事件:
    on_chain_start:链开始执行on_llm_start:调用LLMon_chat_model_stream:聊天模型输出块on_chain_end:链执行完成
事件监听示例
  1. from langchain_core.tracers import ConsoleCallbackHandler
  2. events = chain.stream_events({"topic":"AI"},
  3.     version="v1",
  4.     config={"callbacks":[ConsoleCallbackHandler()]}# 控制台打印事件)for event in events:if event["event"]=="on_chat_model_stream":print(f"收到块: {event['data']['chunk'].content}")
复制代码
典型事件结构
  1. {"event":"on_llm_start","name":"ChatOpenAI","run_id":"uuid123","data":{"input":"龙的传说"}}
复制代码

六、综合应用示例
  1. from langchain_core.runnables import RunnablePassthrough
  2. # 完整工作流
  3. full_chain =({"context": load_context,"query": RunnablePassthrough()}# 并行执行| prompt_template
  4.     | model.with_config({"temperature":0.7})| output_parser
  5. )# 流式+事件监听asyncdefprocess_query(query:str):asyncfor event in full_chain.astream_events(
  6.         query,
  7.         version="v1",
  8.         config={"metadata":{"user_id":"123"}}):if event["event"]=="on_chat_model_stream":yield event["data"]["chunk"].content
复制代码

七、核心优势总结

特性作用技术实现
LCEL声明式组合/类型安全/优化执行操作符重载+类型系统
Runnable统一接口/异步支持/组合能力接口协议+适配器模式
Stream低延迟输出/内存优化生成器+异步迭代器
Events实时监控/调试/中间结果捕获观察者模式+回调系统
通过这四者的结合,LangChain实现了:
    灵活组合:任意模块的自由拼接高效执行:自动批处理/流式/并行全链路可控:事件系统提供深度可观测性
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )