作者:CSDN博客
LangChain 进阶开发者学习笔记
零、环境配置与快速开始
0.1 安装依赖
- # 核心库
- pip install langchain langchain-core langchain-community
- # LLM 提供商(根据需要选择)
- pip install langchain-openai # OpenAI/Azure OpenAI
- pip install langchain-anthropic # Claude
- pip install langchain-google-genai # Gemini
- pip install langchain-ollama # 本地模型# 向量数据库(RAG 必需)
- pip install chromadb # 轻量级向量库
- pip install faiss-cpu # Facebook 向量检索# 工具库
- pip install langsmith # 可观测性(可选)
复制代码 0.2 配置 API Key
方式一:环境变量(推荐)- # Linux/MacexportOPENAI_API_KEY="sk-..."exportANTHROPIC_API_KEY="sk-ant-..."# Windows PowerShell$env:OPENAI_API_KEY="sk-..."
复制代码 方式二:代码中配置- import os
- os.environ["OPENAI_API_KEY"]="sk-..."# 或直接传参from langchain_openai import ChatOpenAI
- llm = ChatOpenAI(api_key="sk-...", model="gpt-4")
复制代码 0.3 验证安装
- from langchain_openai import ChatOpenAI
- from langchain_core.messages import HumanMessage
- # 初始化模型
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)# 测试调用
- response = llm.invoke([HumanMessage(content="你好,请用一句话介绍自己")])print(response.content)
复制代码 执行效果:- 我是 OpenAI 开发的 AI 助手,专注于回答问题、提供信息和协助完成各种任务。
复制代码 0.4 国内环境配置
使用代理:- import os
- os.environ["HTTP_PROXY"]="http://127.0.0.1:7890"
- os.environ["HTTPS_PROXY"]="http://127.0.0.1:7890"
复制代码 使用国内 API 中转:- llm = ChatOpenAI(
- api_key="your-key",
- base_url="https://api.openai-proxy.com/v1",# 中转地址
- model="gpt-4o-mini")
复制代码 使用本地模型(Ollama):- # 安装 Ollamacurl-fsSL https://ollama.com/install.sh |sh# 下载模型
- ollama pull qwen2.5:7b
复制代码- from langchain_ollama import ChatOllama
- llm = ChatOllama(model="qwen2.5:7b", temperature=0)
- response = llm.invoke("你好")print(response.content)
复制代码 一、架构全景:LangChain 的设计哲学
1.1 核心定位
LangChain = Spring Boot for LLM Applications
传统后端:HTTP 请求 → Controller → Service → Repository → DatabaseLangChain:用户输入 → Prompt → LLM → Tools/Memory → 结构化输出
1.2 核心组件对标
| LangChain 组件 | 传统后端类比 | 职责 | | Runnable | Interface/抽象类 | 统一调用接口(.invoke(), .stream(), .batch()) | | LCEL (Expression Language) | 管道/中间件链 | 组件编排(prompt | llm | parser) | | Chain | Service 层业务逻辑 | 封装多步骤流程 | | Agent | 动态路由控制器 | 根据输入自主决策调用哪些 Tools | | Tools | 外部 API/Repository | 执行具体操作(搜索、数据库查询、API 调用) | | Memory | Session/Redis 缓存 | 维护对话上下文 | | Retriever | 数据库查询层 | 向量检索/文档召回 |
二、基础架构:Runnable 与 LCEL
2.1 Runnable:统一调用接口
类比:Java 的 Function<T, R> 或 Spring 的 HandlerInterceptor
所有 LangChain 组件都实现 Runnable 接口,提供标准化调用方式:- from langchain_openai import ChatOpenAI
- from langchain_core.messages import HumanMessage
- import asyncio
- import time
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)# 1. 同步调用
- result = llm.invoke([HumanMessage(content="1+1等于几?")])print(f"同步调用: {result.content}")# 2. 批量调用(一次请求处理多个输入)
- batch_results = llm.batch([[HumanMessage(content="Python 之父是谁?")],[HumanMessage(content="Java 之父是谁?")]])for i, res inenumerate(batch_results):print(f"批量结果 {i+1}: {res.content}")# 3. 流式调用(实时输出)print("流式调用: ", end="")for chunk in llm.stream([HumanMessage(content="用一句话介绍 LangChain")]):print(chunk.content, end="", flush=True)print()# 4. 异步调用(高并发场景)asyncdefasync_demo():"""异步调用示例:适合需要同时处理多个请求的场景"""# 4.1 单个异步调用
- result =await llm.ainvoke([HumanMessage(content="什么是异步编程?")])print(f"\n异步调用: {result.content}")# 4.2 并发执行多个请求(性能优势明显)
- start_time = time.time()# 创建多个异步任务
- tasks =[
- llm.ainvoke([HumanMessage(content="什么是 Python?")]),
- llm.ainvoke([HumanMessage(content="什么是 Java?")]),
- llm.ainvoke([HumanMessage(content="什么是 JavaScript?")])]# 并发执行(同时发送3个请求)
- results =await asyncio.gather(*tasks)
-
- elapsed = time.time()- start_time
- print(f"\n并发执行3个请求耗时: {elapsed:.2f}秒")for i, res inenumerate(results):print(f" 结果 {i+1}: {res.content[:50]}...")# 4.3 异步流式输出print("\n异步流式调用: ", end="")asyncfor chunk in llm.astream([HumanMessage(content="用一句话总结 AI 的未来")]):print(chunk.content, end="", flush=True)print()# 运行异步示例
- asyncio.run(async_demo())
复制代码 执行效果:- 同步调用: 1+1等于2。
- 批量结果 1: Python 之父是 Guido van Rossum。
- 批量结果 2: Java 之父是 James Gosling。
- 流式调用: LangChain 是一个用于构建基于大语言模型应用的开发框架。
- 异步调用: 异步编程是一种编程范式,允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞等待。
- 并发执行3个请求耗时: 1.23秒
- 结果 1: Python 是一种高级、解释型、通用的编程语言...
- 结果 2: Java 是一种面向对象的编程语言,由 Sun Microsystems...
- 结果 3: JavaScript 是一种轻量级、解释型的编程语言...
- 异步流式调用: AI 的未来将更加智能化、普及化,深度融入人类生活的各个领域。
复制代码 性能对比:
同步执行 3 个请求:约 3-4 秒(串行执行)异步并发 3 个请求:约 1-1.5 秒(并行执行)性能提升:约 3 倍
关键方法:
invoke() - 同步执行,等待完整响应ainvoke() - 异步执行(高并发场景,需配合 asyncio)stream() - 流式输出(实时响应,用户体验更好)astream() - 异步流式输出(结合异步 + 流式的优势)batch() - 批量处理(减少网络开销,提升吞吐量)
何时使用异步:
✅ Web 服务器:同时处理多个用户请求(FastAPI、Django Async)✅ 批量任务:需要调用多个 LLM 接口✅ 实时应用:聊天机器人、客服系统❌ 简单脚本:单次调用用同步即可
2.2 LCEL:管道式组件编排
类比:Spring WebFlux 的 Reactive Streams 或 Unix 管道- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from langchain_openai import ChatOpenAI
- # 管道式组装:prompt → llm → parser
- chain =(
- ChatPromptTemplate.from_template("将 {text} 翻译成英文")| ChatOpenAI(model="gpt-4o-mini", temperature=0)| StrOutputParser())# 执行
- result = chain.invoke({"text":"你好世界"})print(f"翻译结果: {result}")# 批量翻译
- batch_results = chain.batch([{"text":"早上好"},{"text":"谢谢"}])for i, res inenumerate(batch_results):print(f"批量翻译 {i+1}: {res}")
复制代码 执行效果:- 翻译结果: Hello, world
- 批量翻译 1: Good morning
- 批量翻译 2: Thank you
复制代码 核心优势:
类型安全:编译时检查输入输出类型可组合:任意 Runnable 可通过 | 连接可观测:自动集成 LangSmith 追踪
2.3 高级编排:并行与条件分支
- from langchain_core.runnables import RunnableParallel, RunnableBranch
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from langchain_openai import ChatOpenAI
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)# 1. 并行执行(类比 CompletableFuture.allOf)
- summarize_chain =(
- ChatPromptTemplate.from_template("用一句话总结:{text}")| llm | StrOutputParser())
- translate_chain =(
- ChatPromptTemplate.from_template("翻译成英文:{text}")| llm | StrOutputParser())
- sentiment_chain =(
- ChatPromptTemplate.from_template("分析情感(正面/负面/中性):{text}")| llm | StrOutputParser())
- parallel_chain = RunnableParallel(
- summary=summarize_chain,
- translation=translate_chain,
- sentiment=sentiment_chain
- )
- result = parallel_chain.invoke({"text":"今天天气真好,心情很愉快!"})print("并行执行结果:")print(f" 摘要: {result['summary']}")print(f" 翻译: {result['translation']}")print(f" 情感: {result['sentiment']}")# 2. 条件分支(类比 if-else 路由)
- chinese_chain = ChatPromptTemplate.from_template("用中文回答:{question}")| llm | StrOutputParser()
- english_chain = ChatPromptTemplate.from_template("Answer in English: {question}")| llm | StrOutputParser()
- default_chain = ChatPromptTemplate.from_template("回答:{question}")| llm | StrOutputParser()
- branch_chain = RunnableBranch((lambda x: x.get("language")=="zh", chinese_chain),(lambda x: x.get("language")=="en", english_chain),
- default_chain # 默认分支)
- result_zh = branch_chain.invoke({"language":"zh","question":"什么是 AI?"})
- result_en = branch_chain.invoke({"language":"en","question":"What is AI?"})print(f"\n中文分支: {result_zh}")print(f"英文分支: {result_en}")
复制代码 执行效果:- 并行执行结果:
- 摘要: 今天天气好,心情愉快。
- 翻译: The weather is really nice today, and I'm in a great mood!
- 情感: 正面
- 中文分支: AI(人工智能)是指通过计算机系统模拟人类智能的技术...
- 英文分支: AI (Artificial Intelligence) refers to the simulation of human intelligence...
复制代码 三、核心特性:Chains、Agents、Memory
3.1 Chain:封装业务流程
类比:Service 层的业务方法
基础 Chain
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from langchain_openai import ChatOpenAI
- # 使用 LCEL 构建简单链
- prompt = ChatPromptTemplate.from_template("用通俗易懂的语言解释 {concept}")
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
- parser = StrOutputParser()
- chain = prompt | llm | parser
- # 执行
- result = chain.invoke({"concept":"依赖注入"})print(f"解释结果:\n{result}")
复制代码 执行效果:- 解释结果:
- 依赖注入就像是你去餐厅吃饭,服务员会把菜端到你面前,而不是让你自己去厨房拿。
- 在编程中,依赖注入是指一个对象不自己创建它需要的其他对象,而是由外部传递进来,
- 这样可以让代码更灵活、更容易测试。
复制代码 RAG Chain(检索增强生成)
类比:先查数据库,再用查询结果生成响应- from langchain_core.runnables import RunnablePassthrough
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from langchain_openai import ChatOpenAI, OpenAIEmbeddings
- from langchain_community.vectorstores import Chroma
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- from langchain_core.documents import Document
- # 1. 准备知识库数据
- documents =[
- Document(page_content="向量数据库是专门用于存储和检索高维向量的数据库,常用于 AI 应用。"),
- Document(page_content="Chroma 是一个轻量级的开源向量数据库,支持本地运行。"),
- Document(page_content="RAG(检索增强生成)通过检索相关文档来增强 LLM 的回答准确性。")]# 2. 创建向量存储
- vectorstore = Chroma.from_documents(
- documents=documents,
- embedding=OpenAIEmbeddings())
- retriever = vectorstore.as_retriever(search_kwargs={"k":2})# 3. 构建 RAG 链defformat_docs(docs):return"\n\n".join([d.page_content for d in docs])
- rag_chain =({"context": retriever | format_docs,# 检索并格式化文档"question": RunnablePassthrough()# 透传原始问题}| ChatPromptTemplate.from_template("基于以下上下文回答问题:\n{context}\n\n问题:{question}")| ChatOpenAI(model="gpt-4o-mini", temperature=0)| StrOutputParser())# 4. 执行查询
- result = rag_chain.invoke("什么是向量数据库?")print(f"RAG 回答:\n{result}")
复制代码 执行效果:- RAG 回答:
- 向量数据库是专门用于存储和检索高维向量的数据库系统,常用于 AI 应用场景。
- 例如 Chroma 就是一个轻量级的开源向量数据库,支持本地运行,非常适合快速原型开发。
复制代码 关键点:
retriever 负责向量检索(类比 DAO 层)RunnablePassthrough() 透传输入(类比 Spring 的 @PathVariable)format_docs() 将检索结果格式化为 Prompt 上下文
3.2 Agent:自主决策执行器
类比:动态路由 + 策略模式
Agent 会根据用户输入自主决定调用哪些 Tools,类似于:- // 伪代码if(需要搜索){
- result = searchTool.execute();}elseif(需要计算){
- result = calculatorTool.execute();}
复制代码 ReAct Agent(推理 + 行动)
注意:Tool的函数中,必须至少要有一个入参作为占位,即使在函数中没有使用- from langchain.agents import AgentExecutor, create_react_agent
- from langchain_core.tools import Tool
- from langchain_core.prompts import PromptTemplate
- from langchain_openai import ChatOpenAI
- # 1. 定义工具defsearch_func(query:str)->str:"""模拟搜索功能"""
- knowledge_base ={"2022世界杯":"2022年世界杯冠军是阿根廷","Python":"Python 是一种高级编程语言",}for key, value in knowledge_base.items():if key in query:return value
- returnf"未找到关于 '{query}' 的信息"defcalculator_func(expression:str)->str:"""计算器工具"""try:
- result =eval(expression)returnf"计算结果: {result}"except Exception as e:returnf"计算错误: {str(e)}"
- search_tool = Tool(
- name="搜索",
- func=search_func,
- description="用于查询最新信息和知识,输入应该是搜索关键词")
- calculator_tool = Tool(
- name="计算器",
- func=calculator_func,
- description="用于执行数学计算,输入应该是数学表达式,例如 '2+3*4'")
- tools =[search_tool, calculator_tool]# 2. 创建 Agent(使用 ReAct 模板)from langchain import hub
- prompt = hub.pull("hwchase17/react")
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
- agent = create_react_agent(llm, tools, prompt)# 3. 创建执行器
- agent_executor = AgentExecutor(
- agent=agent,
- tools=tools,
- verbose=True,# 打印推理过程
- max_iterations=5,
- handle_parsing_errors=True)# 4. 执行任务print("=== 任务1: 需要搜索 ===")
- result1 = agent_executor.invoke({"input":"2022年世界杯冠军是谁?"})print(f"最终答案: {result1['output']}\n")print("=== 任务2: 需要计算 ===")
- result2 = agent_executor.invoke({"input":"计算 (25 + 15) * 3"})print(f"最终答案: {result2['output']}\n")print("=== 任务3: 需要多步推理 ===")
- result3 = agent_executor.invoke({"input":"如果一本书30元,我买5本需要多少钱?"})print(f"最终答案: {result3['output']}")
复制代码 执行效果:- === 任务1: 需要搜索 ===
- > Entering new AgentExecutor chain...
- Thought: 我需要搜索2022年世界杯的信息
- Action: 搜索
- Action Input: 2022世界杯
- Observation: 2022年世界杯冠军是阿根廷
- Thought: 我现在知道答案了
- Final Answer: 2022年世界杯冠军是阿根廷
- > Finished chain.
- 最终答案: 2022年世界杯冠军是阿根廷
- === 任务2: 需要计算 ===
- > Entering new AgentExecutor chain...
- Thought: 这是一个数学计算问题
- Action: 计算器
- Action Input: (25 + 15) * 3
- Observation: 计算结果: 120
- Thought: 我得到了计算结果
- Final Answer: 120
- > Finished chain.
- 最终答案: 120
- === 任务3: 需要多步推理 ===
- > Entering new AgentExecutor chain...
- Thought: 需要计算5本书的总价
- Action: 计算器
- Action Input: 30 * 5
- Observation: 计算结果: 150
- Thought: 我得到了答案
- Final Answer: 买5本书需要150元
- > Finished chain.
- 最终答案: 买5本书需要150元
复制代码 执行流程:
Thought:分析问题 → “需要搜索最新信息”Action:选择工具 → 搜索Action Input:工具输入 → “2022世界杯”Observation:获取结果 → “2022年世界杯冠军是阿根廷”Final Answer:生成回答
自定义 Tool
- from langchain_core.tools import tool
- @tooldefquery_database(sql:str)->str:"""执行 SQL 查询并返回结果
-
- Args:
- sql: SQL 查询语句
-
- Returns:
- 查询结果的字符串表示
- """# 模拟数据库查询
- mock_db ={"SELECT * FROM users":"用户列表: [张三, 李四, 王五]","SELECT COUNT(*) FROM orders":"订单总数: 1523"}return mock_db.get(sql,f"执行查询: {sql}")# 集成到 Agent
- tools =[query_database, search_tool, calculator_tool]
- agent = create_react_agent(llm, tools, prompt)
- agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
- result = agent_executor.invoke({"input":"查询用户列表"})print(f"结果: {result['output']}")
复制代码 执行效果:- > Entering new AgentExecutor chain...
- Thought: 需要查询数据库
- Action: query_database
- Action Input: SELECT * FROM users
- Observation: 用户列表: [张三, 李四, 王五]
- Final Answer: 用户列表包括:张三、李四、王五
- > Finished chain.
- 结果: 用户列表包括:张三、李四、王五
复制代码 3.3 Memory:对话上下文管理
类比:Session 管理 + Redis 缓存
ConversationBufferMemory(完整历史)
- from langchain.memory import ConversationBufferMemory
- from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
- from langchain_core.output_parsers import StrOutputParser
- from langchain_openai import ChatOpenAI
- # 1. 创建 Memory
- memory = ConversationBufferMemory(
- memory_key="chat_history",
- return_messages=True)# 2. 构建带 Memory 的 Chain
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
- prompt = ChatPromptTemplate.from_messages([("system","你是一个友好的助手,能记住之前的对话内容"),
- MessagesPlaceholder(variable_name="chat_history"),# 历史消息占位符("user","{input}")])
- chain = prompt | llm | StrOutputParser()# 3. 对话循环defchat_with_memory(user_input:str):# 加载历史
- history = memory.load_memory_variables({})# 执行对话
- response = chain.invoke({"input": user_input,"chat_history": history.get("chat_history",[])})# 保存到 Memory
- memory.save_context({"input": user_input},{"output": response})return response
- # 测试对话print("对话1:", chat_with_memory("我叫张三"))print("对话2:", chat_with_memory("我喜欢编程"))print("对话3:", chat_with_memory("我叫什么名字?"))print("对话4:", chat_with_memory("我的爱好是什么?"))# 查看完整历史print("\n完整对话历史:")print(memory.load_memory_variables({}))
复制代码 执行效果:- 对话1: 你好张三!很高兴认识你。
- 对话2: 编程是一项很有趣的技能!你主要使用哪种编程语言呢?
- 对话3: 你叫张三。
- 对话4: 你的爱好是编程。
- 完整对话历史:
- {'chat_history': [
- HumanMessage(content='我叫张三'),
- AIMessage(content='你好张三!很高兴认识你。'),
- HumanMessage(content='我喜欢编程'),
- AIMessage(content='编程是一项很有趣的技能!你主要使用哪种编程语言呢?'),
- HumanMessage(content='我叫什么名字?'),
- AIMessage(content='你叫张三。'),
- HumanMessage(content='我的爱好是什么?'),
- AIMessage(content='你的爱好是编程。')
- ]}
复制代码 ConversationSummaryMemory(摘要压缩)
类比:定期归档旧数据,只保留摘要- from langchain.memory import ConversationSummaryMemory
- # 创建摘要 Memory
- summary_memory = ConversationSummaryMemory(
- llm=ChatOpenAI(model="gpt-4o-mini"),
- max_token_limit=100# 超过限制自动摘要)# 模拟长对话
- conversations =[("我叫张三,是一名软件工程师","你好张三!很高兴认识你这位软件工程师。"),("我在北京工作","北京是个很棒的城市!"),("我喜欢 Python 和 Java","这两门语言都很实用!"),("我最近在学习 LangChain","LangChain 是个很强大的框架!")]for user_msg, ai_msg in conversations:
- summary_memory.save_context({"input": user_msg},{"output": ai_msg})# 查看摘要(而非完整历史)print("对话摘要:")print(summary_memory.load_memory_variables({}))
复制代码 执行效果:- 对话摘要:
- {'history': '张三是一名在北京工作的软件工程师,喜欢 Python 和 Java,最近在学习 LangChain 框架。'}
复制代码 持久化 Memory(使用文件存储)
- from langchain.memory import FileChatMessageHistory, ConversationBufferMemory
- # 使用文件持久化(生产环境建议用 Redis/PostgreSQL)
- message_history = FileChatMessageHistory("chat_history.json")
- persistent_memory = ConversationBufferMemory(
- chat_memory=message_history,
- return_messages=True)# 保存对话
- persistent_memory.save_context({"input":"我的用户ID是12345"},{"output":"已记录您的用户ID"})# 重启程序后仍可读取print("持久化历史:", persistent_memory.load_memory_variables({}))
复制代码 执行效果:- 持久化历史: {'chat_history': [
- HumanMessage(content='我的用户ID是12345'),
- AIMessage(content='已记录您的用户ID')
- ]}
- # chat_history.json 文件内容:
- [
- {"type": "human", "data": {"content": "我的用户ID是12345"}},
- {"type": "ai", "data": {"content": "已记录您的用户ID"}}
- ]
复制代码 四、高级用法:自定义与优化
4.1 自定义 Agent 逻辑
- from langchain.agents import BaseSingleActionAgent
- from typing import List, Tuple, Any
- classCustomAgent(BaseSingleActionAgent):"""自定义 Agent:实现特定业务逻辑"""defplan(
- self,
- intermediate_steps: List[Tuple[Any,str]],**kwargs
- )-> Any:"""决策逻辑:选择下一步行动"""# 自定义决策算法if"搜索"in kwargs["input"]:return AgentAction(tool="search", tool_input=kwargs["input"])else:return AgentFinish(return_values={"output":"无需工具"})asyncdefaplan(self,*args,**kwargs):"""异步版本"""return self.plan(*args,**kwargs)
复制代码 4.2 复杂 RAG 链路
Multi-Query RAG(多查询召回)
- from langchain.retrievers import MultiQueryRetriever
- # 自动生成多个查询变体
- retriever = MultiQueryRetriever.from_llm(
- retriever=base_retriever,
- llm=ChatOpenAI())# 输入:"什么是 RAG?"# 自动生成:["RAG 是什么", "检索增强生成的定义", "RAG 技术原理"]
复制代码 Re-ranking(重排序)
- from langchain.retrievers import ContextualCompressionRetriever
- from langchain.retrievers.document_compressors import CohereRerank
- compressor = CohereRerank(model="rerank-english-v2.0")
- compression_retriever = ContextualCompressionRetriever(
- base_compressor=compressor,
- base_retriever=retriever
- )
复制代码 4.3 流式输出(实时响应)
- from langchain_core.prompts import ChatPromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- from langchain_openai import ChatOpenAI
- import time
- llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)# 构建链
- chain =(
- ChatPromptTemplate.from_template("写一首关于 {topic} 的五言绝句")| llm
- | StrOutputParser())# 1. 同步流式输出print("=== 同步流式输出 ===")for chunk in chain.stream({"topic":"春天"}):print(chunk, end="", flush=True)
- time.sleep(0.05)# 模拟打字机效果print("\n")# 2. 异步流式输出(高并发场景)import asyncio
- asyncdefasync_stream_demo():print("=== 异步流式输出 ===")asyncfor chunk in chain.astream({"topic":"编程"}):print(chunk, end="", flush=True)await asyncio.sleep(0.05)print("\n")# 运行异步示例
- asyncio.run(async_stream_demo())# 3. 带 Token 使用统计的流式输出from langchain_core.callbacks import StreamingStdOutCallbackHandler
- llm_with_callback = ChatOpenAI(
- model="gpt-4o-mini",
- streaming=True,
- callbacks=[StreamingStdOutCallbackHandler()])
- chain_with_callback =(
- ChatPromptTemplate.from_template("用一句话介绍 {concept}")| llm_with_callback
- | StrOutputParser())print("=== 带回调的流式输出 ===")
- result = chain_with_callback.invoke({"concept":"LangChain"})print(f"\n完整结果: {result}")
复制代码 执行效果:- === 同步流式输出 ===
- 春风拂柳绿,
- 花开满园香。
- 燕归巢中语,
- 万物复苏忙。
- === 异步流式输出 ===
- 代码如诗行,
- 逻辑织梦想。
- 键盘敲击声,
- 创造新世界。
- === 带回调的流式输出 ===
- LangChain 是一个用于构建基于大语言模型应用的开发框架。
- 完整结果: LangChain 是一个用于构建基于大语言模型应用的开发框架。
复制代码 4.4 错误处理与重试
- from langchain_core.runnables import RunnableRetry
- # 自动重试(类比 Spring Retry)
- chain_with_retry = chain.with_retry(
- stop_after_attempt=3,
- wait_exponential_multiplier=1,
- wait_exponential_max=10)# 降级处理
- chain_with_fallback = chain.with_fallbacks([backup_chain],
- exceptions_to_handle=(RateLimitError,))
复制代码 五、最佳实践
5.1 性能优化
批量处理:使用 batch() 减少网络开销异步调用:高并发场景用 ainvoke()缓存:启用 LLM 缓存减少重复调用流式输出:长文本生成用 stream()
5.2 可观测性
- from langsmith import Client
- # 集成 LangSmith 追踪
- client = Client()# 自动记录所有调用链路with client.trace("my_chain"):
- result = chain.invoke(input)
复制代码 5.3 安全性
输入验证:防止 Prompt 注入工具权限控制:限制 Agent 可调用的 Tools输出过滤:敏感信息脱敏
六、核心概念速查表
| 概念 | 用途 | 代码示例 | | Runnable | 统一接口 | component.invoke(input) | | LCEL | 管道编排 | prompt | llm | parser | | Chain | 业务流程 | LLMChain(llm, prompt) | | Agent | 自主决策 | create_react_agent(llm, tools) | | Tool | 外部能力 | @tool def func(): ... | | Memory | 上下文 | ConversationBufferMemory() | | Retriever | 文档召回 | vectorstore.as_retriever() |
原文地址:https://blog.csdn.net/hsnsnusjs/article/details/160501263 |