AI创想

标题: Langchain 工作流编排 [打印本页]

作者: 米落枫    时间: 2025-9-7 23:38
标题: Langchain 工作流编排
一、工作流编排核心架构

LangChain通过LCEL(LangChain Expression Language)Runnable接口构建声明式工作流,其核心原理是:
  1. from langchain_core.runnables import RunnablePassthrough
  2. chain = prompt | model | output_parser  # LCEL管道语法
复制代码

二、LCEL(LangChain Expression Language)

核心作用

基础用法
  1. from langchain_core.prompts import ChatPromptTemplate
  2. from langchain_openai import ChatOpenAI
  3. prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的故事")
  4. model = ChatOpenAI()
  5. chain = prompt | model |(lambda x: x.content)# 组合Prompt+LLM+输出解析
  6. chain.invoke({"topic":"龙"})# 输出完整故事文本
复制代码
高级组合
  1. # 分支与合并from langchain_core.runnables import RunnableParallel
  2. map_chain = RunnableParallel(
  3.     joke=chain,
  4.     poem=poem_chain  # 假设已定义的诗生成链)
复制代码

三、Runnable接口

统一接口方法

所有组件(模型/提示词/解析器等)都实现以下方法:
  1. classRunnable(Protocol):definvoke(self,input: Any)-> Any:...# 同步调用asyncdefainvoke(self,input: Any)-> Any:...# 异步调用defbatch(self, inputs: List)-> List:...# 批量处理defstream(self,input: Any)-> Iterator:...# 流式输出
复制代码
关键特性

  1. # 自定义Runnablefrom langchain_core.runnables import RunnableLambda
  2. deflength_func(text:str)->int:returnlen(text)
  3. length_chain = RunnableLambda(length_func)
  4. length_chain.invoke("hello")# 输出5
复制代码

四、Stream流式传输

实现原理

同步流式
  1. for chunk in chain.stream({"topic":"太空"}):print(chunk, end="", flush=True)# 逐词打印生成的文本
复制代码
异步流式
  1. asyncfor chunk in chain.astream({"topic":"火星"}):print(chunk, end="", flush=True)
复制代码

五、Stream Events事件驱动

事件类型

通过stream_events方法捕获完整生命周期事件:
事件监听示例
  1. from langchain_core.tracers import ConsoleCallbackHandler
  2. events = chain.stream_events({"topic":"AI"},
  3.     version="v1",
  4.     config={"callbacks":[ConsoleCallbackHandler()]}# 控制台打印事件)for event in events:if event["event"]=="on_chat_model_stream":print(f"收到块: {event['data']['chunk'].content}")
复制代码
典型事件结构
  1. {"event":"on_llm_start","name":"ChatOpenAI","run_id":"uuid123","data":{"input":"龙的传说"}}
复制代码

六、综合应用示例
  1. from langchain_core.runnables import RunnablePassthrough
  2. # 完整工作流
  3. full_chain =({"context": load_context,"query": RunnablePassthrough()}# 并行执行| prompt_template
  4.     | model.with_config({"temperature":0.7})| output_parser
  5. )# 流式+事件监听asyncdefprocess_query(query:str):asyncfor event in full_chain.astream_events(
  6.         query,
  7.         version="v1",
  8.         config={"metadata":{"user_id":"123"}}):if event["event"]=="on_chat_model_stream":yield event["data"]["chunk"].content
复制代码

七、核心优势总结

特性作用技术实现
LCEL声明式组合/类型安全/优化执行操作符重载+类型系统
Runnable统一接口/异步支持/组合能力接口协议+适配器模式
Stream低延迟输出/内存优化生成器+异步迭代器
Events实时监控/调试/中间结果捕获观察者模式+回调系统
通过这四者的结合,LangChain实现了:





欢迎光临 AI创想 (http://llms-ai.com/) Powered by Discuz! X3.4