作者:CSDN博客
一、LangChain概述
1.1 什么是 LangChain
github地址:https://github.com/langchain-ai/langchain
官网地址:https://www.langchain.com/langchain
官方文档:https://docs.langchain.com/oss/python/langchain/overview
API 文档:https://reference.langchain.com/python/langchain/
LangChain是2022年10月,由哈佛大学的Harrison Chase(哈里森·蔡斯)发起研发的一个开源框架,用于开发由大语言模型(LLMs)驱动的应用程序。比如,搭建 Agent、问答系统(QA)、对话机器人、文档搜索系统等。LangChain 之于 LLMs,类似 Spring 之于 Java,Django 之于 Python。顾名思义,LangChain中的“Lang”是指language,即大语言模型,“Chain”即“链”,也就是将大模型与外部数据&各种组件连接成链,以此构建AI应用程序。
1.2 有哪些大模型应用开发框架
框架
| 描述
| LangChain
(Python)
| 出现最早、最成熟的,适合复杂任务分解和单Agent应用
| LlamaIndex
(Python)
| 专注于高效的索引和检索,适合 RAG 场景
| LangChain4J
(Java)
| LangChain出了Java、JavaScript(LangChain.js)两个语言的版本,LangChain4j的功能略少于LangChain,但是主要的核心功能都是有的
| SpringAI/SpringAI Alibaba
(Java)
| 有待进一步成熟,只是简单的对于一些接口进行了封装
| SemanticKernel
(C#)
| 微软推出的,对于C#同学来说,那就是5颗星
| 1.3 为什么需要LangChain
使用LangChain的好处:
简化开发难度:更简单、更高效、效果更好。开发人员可以更专注于业务逻辑,而无须花费大量时间和精力处理底层技术细节。学习成本更低:不同模型的API不同,调用方式也有区别,切换模型时学习成本高。使用LangChain,可以以统一、规范的方式进行调用,有更好的移植性。现成的链式组装:LangChain提供了一些现成的链式组装,用于完成特定的高级任务。让复杂的逻辑变得结构化、易组合、易扩展。
1.4 LangChain 使用场景
项目场景
| 技术点
| 文档问答助手
| Prompt + Embedding + RetrievalQA
| 智能日程规划助手
| Agent + Tool + Memory
| LLM+数据库问答
| SQLDatabaseToolkit + Agent
| 多模型路由对话系统
| RouterChain + 多 LLM
| 互联网智能客服
| ConversationChain + RAG +Agent
| 企业知识库助手(RAG + 本地模型)
| VectorDB + LLM + Streamlit
| 1.5 LangChain 生态位置
1.6 LangChain 架构
LangChain 帮助快速开始构建 Agent ,支持选择的任何模型提供商。LangGraph 允许通过低级编排、记忆和人工参与支持来控制自定义 Agent 的每一步。可以管理具有持久执行能力的长时间运行任务。LangSmith 是一个帮助 AI 团队使用实时生产数据进行持续测试和改进的平台。提供观测、评估与部署功能。Deep Agents 用于构建能够规划、使用子 Agent 并利用文件系统处理复杂任务的 Agent,受 Claude Code、Deep Research 和 Manus 等应用的启发。
包
| 描述
| langchain
| 包含构建使用 LLM 的应用所需的所有实现的主入口点
| langchain-core
| LangChain 生态系统中的核心接口和抽象
| langchain-text-splitters
| 用于文档处理的文本分割工具
| langchain-mcp-adapters
| 在 LangChain 和 LangGraph 应用中提供 MCP 工具
| langchain-tests
| 用于验证 LangChain 集成包实现的标准化测试套件
| langchain-classic
| 遗留的 angchain 实现和组件
| 1.7 LangChain的核心组件
LangChain的核心组件主要涉及四个部分:Model I/O、Chains、RAG、Agents。
标准化大模型的输入和输出,包含提示模版,模型调用和格式化输出。
Format(格式化):通过模板管理大模型的输入。将原始数据格式化成模型可以处理的形式,插入到一个模板中,然后送入模型进行处理。Predict(预测):调用 LLM 接收输入,进行预测或生成回答。
Parse(解析):规范化模型输出。比如将模型输出格式化为 JSON。
“链条”用于将多个组件组合成一个完整的流程,方便链式调用。
对应RAG:检索外部数据,作为参考信息输入LLM辅助生成答案
Source:多种类型的数据源:视频、图片、文本、代码、文档等。Load:将多源异构数据统一加载为文档对象。Transform:对文档进行转换和处理,比如将文本切分为小块。Embed:将文本编码为向量。Store:将向量化后的数据存储起来。Retrieve:从文本库中检索相关的文本段落。
Agent 自主规划执行步骤并使用工具来完成任务。
二、Model I/O
2.1 介绍
Model I/O 部分是与语言模型进行交互的核心组件,包括输入提示(Prompt Template)、调用模型(Model)、输出解析(Output Parser)。简单来说,就是输入、处理、输出这三个步骤。
2.2 调用模型
2.2.1 直接调用在线大模型
我的windows电脑,设置环境变量如下(主要是避免大模型api-key明文写在代码里),通过os.getenv获取api-key:
- import os
- from openai import OpenAI
- client = OpenAI(
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- completion = client.chat.completions.create(
- model="qwen-plus",
- messages=[{"role": "user", "content": "你是谁"}],
- )
- print(completion.choices[0].message.content)
复制代码
2.2.2 LangChain API调用模型
通常通过聊天模型接口访问 LLM,该接口通常以消息列表作为输入并返回一条消息作为输出。
输入:接受文本 PromptValue 或消息列表 List[BaseMessage],每条消息需指定角色(如 SystemMessage、HumanMessage、AIMessage)
输出:返回带角色的消息对象(BaseMessage 子类),通常是 AIMessage- import os
- from langchain.chat_models import init_chat_model
- from langchain.messages import SystemMessage, HumanMessage, AIMessage
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- messages = [
- SystemMessage(content="你是一个诗人"),
- HumanMessage(content="写一首关于春天的诗"),
- ]
- resp = llm.invoke(messages)
- print(type(resp)) # <class 'langchain_core.messages.ai.AIMessage'>
- print(resp.content)
复制代码
2.2.3 模型初始化相关参数
初始化一个模型最简单的方法就是使用 init_chat_model,并设置必要的参数,例如 API-Key 和模型名称。除此之外还有一些其他参数。
参数
| 说明
| model
| 模型名称或标识符
| base_url
| 发送请求的 API 端点的 URL。常由模型的提供商提供
| api_key
| 与模型提供商进行身份验证所需的 API 密钥
| temperature
| 控制模型输出的随机性。数字越高,回答越有创意;数字越低,回答越确定
| timeout
| 在取消请求之前,等待模型响应的最大时间(以秒为单位)
| max_tokens
| 限制响应中的总tokens 数量,控制输出长度
| max_retries
| 请求失败时系统尝试重新发送请求的最大次数
| 2.2.4 对话模型的 Message
对话模型的输入可以是文本提示、消息提示或是字典格式。
文本提示是字符串,适用于不需要保留对话历史的直接生成任务。消息提示是将消息对象列表输入模型,方便管理对话历史,包含系统指令以及处理多模态数据。
字典格式可以按照 OpenAI 聊天补全格式创建字典列表组成消息。一条消息通常包含 role(角色)、content(内容)、metadata(元数据)。
消息类型如下(HumanMessage、AIMessage 和 SystemMessage 是常用的消息类型。
ToolMessage 是在工具调用场景下才会使用的特殊消息类型。):
消息类型
| 描述
| SystemMessage
| 代表一组初始指令,用于引导模型的行为。可以使用系统消息来设定语气、定义模型的角色,并建立响应的指导方针
| HumanMessage
| 表示用户输入
| AIMessage
| 模型生成的响应,包括文本内容、工具调用和元数据
| ToolMessage
| 表示工具调用的输出
| 2.2.5 调用方法
聊天模型提供了三种主要的调用方法:
invoke / ainvoke
| 将单个输入转换为输出
| batch / abatch
| 批量将多个输入转换为输出
| stream / astream
| 从单个输入生成流式输出
| 带有“a”前缀的方法是异步的,需要与 asyncio 和 await 语法一起使用以实现并发。
2.2.5.1 非流式/流式输出
在Langchain中,语言模型的输出分为了两种主要的模式:流式输出与非流式输出。
非流式输出:用户提出需求请编写一首诗,系统在静默数秒后突然弹出了完整的诗歌。如同一种“提交请求,等待结果”的流程,实现简单,但体验单调。流式输出:用户提问,请编写一首诗,当问题刚刚发送,系统就开始一字一句(逐个token)进行回复,更像是“实时对话”,贴近人类交互的习惯。
- import os
- from langchain.chat_models import init_chat_model
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- messages = [
- {"role": "system", "content": "你是一名数学家"},
- {"role": "user", "content": "请证明以下黎曼猜想"},
- ]
- # resp = llm.invoke(messages)
- # print(resp.content)
- # 使用 stream() 方法流式输出
- for chunk in llm.stream(messages):
- # 逐个打印内容块,并刷新缓冲区以即时显示内容
- print(chunk.content, end="", flush=True)
复制代码 2.2.5.2 批量调用
将一组独立的请求批量发送给模型并行处理。batch 默认没有依赖底层 API 的原生批量接口,而是使用线程池并行执行多个 invoke()。所以它对 IO 密集型任务(如调用远程 LLM API)很有效。- import os
- from langchain.chat_models import init_chat_model
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- messages = [
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于春天的诗"},
- ],
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于夏天的诗"},
- ],
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于秋天的诗"},
- ],
- ]
- resp = llm.batch(messages) # 批量调用,返回一个消息列表
- print(resp)
复制代码 2.2.5.3 同步/异步调用
- import os
- import time
- from langchain.chat_models import init_chat_model
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- messagess = [
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于春天的诗"},
- ],
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于夏天的诗"},
- ],
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于秋天的诗"},
- ],
- ]
- start_time = time.time()
- resps = [llm.invoke(messages) for messages in messagess]
- print(resps)
- end_time = time.time()
- print(f"Total time: {end_time - start_time}")
- # Total time: 18.67403507232666
复制代码- import os
- import time
- import asyncio
- from langchain.chat_models import init_chat_model
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- messagess = [
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于春天的诗"},
- ],
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于夏天的诗"},
- ],
- [
- {"role": "system", "content": "你是一位诗人"},
- {"role": "user", "content": "写一首关于秋天的诗"},
- ],
- ]
- async def async_invoke():
- tasks = [llm.ainvoke(messages) for messages in messagess]
- return await asyncio.gather(*tasks)
- start_time = time.time()
- resps = asyncio.run(async_invoke())
- print(resps)
- end_time = time.time()
- print(f"Total time: {end_time - start_time}")
- # Total time: 7.498462438583374
复制代码 异步调用,允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞等待。这在处理 I/O 操作(如网络请求、文件读写等)时特别有用,可以显著提高程序的效率和响应性。使用 asyncio.gather() 并行执行时,因为多个任务几乎同时开始,它们的执行时间将重叠。理想情况下,如果多个任务的执行时间相同,那么总执行时间应该接近单个任务的执行时间。
2.2.6 调用本地模型
Ollama是一个开源项目,其项目定位是:一个本地运行大模型的集成框架。目前主要针对主流的LlaMA架构的开源大模型设计,可以实现如 Qwen、Deepseek 等主流大模型的下载、启动和本地运行的自动化部署及推理流程。
目前作为一个非常热门的大模型托管平台,已被包括LangChain、Taskweaver等在内的多个热门项目高度集成。
Ollama官方地址:https://ollama.com
Ollama Github开源地址:https://github.com/ollama/ollama
本地安装部署很简单,不用过多描述。
安装完成后,运行模型(我安装的是qwen2.5:3b):- from langchain_ollama import ChatOllama
- ollama_llm = ChatOllama(model="qwen2.5:3b")
- messages = [{"role": "user", "content": "你好,请介绍一下你自己"}]
- resp = ollama_llm.invoke(messages)
- print(resp.content)
复制代码 返回:
2.3 PromptTemplate
2.3.1 PromptTemplate
PromptTemplate 用于快速构建包含变量的提示词模板,并通过传入不同的参数值生成自定义的提示词。
参数
| template
| 提示模板,包括变量占位符
| input_variables
| 需要将其值作为提示输入的变量名称列表
| partial_variables
| 提示模板携带的部分变量的字典。使用部分变量预先填充模板,无需后续在每次调用时再传递这些变量
| 方法
| format()
| 使用输入格式化提示
|
- from langchain_core.prompts import PromptTemplate
- template = PromptTemplate.from_template("{foo} {bar}")
- prompt = template.invoke({"foo": "hello", "bar": "world"})
- print(prompt, type(prompt))
- # text='hello world' <class 'langchain_core.prompt_values.StringPromptValue'>
- prompt_str = prompt.to_string()
- print(prompt_str, type(prompt_str))
- # hello world <class 'str'>
复制代码 2.3.2 ChatPromptTemplate
ChatPromptTemplate是创建聊天消息列表的提示模板。相较于普通 PromptTemplate更适合处理多角色、多轮次的对话场景。支持 System/Human/AI 等不同角色的消息模板。
实例化:
ChatPromptTemplate 可以通过构造方法或 format_messages 方法来实例化提示词模板。
实例化时需要传入 messages 参数,messages 参数支持如下格式:
tuple 构成的列表,格式为[(role, content)]dict 构成的列表,格式为[{“role”:... , “content”:...}]Message 类构成的列表
- from langchain_core.prompts import ChatPromptTemplate
- template = ChatPromptTemplate(
- [
- ("system", "你是一个AI开发工程师,你的名字是{name}。"),
- ("human", "你能帮我做什么?"),
- ("ai", "我能开发很多{thing}。"),
- ("human", "{user_input}"),
- ]
- )
- prompt = template.format_messages(name="小智AI", thing="AI", user_input="行")
- print(prompt)
复制代码 调用:
推荐使用 format_messages 方法或 invoke 方法调用。
2.4 Output Parsers
在应用开发中,大模型的输出可能是下一步逻辑处理的关键输入。因此,在这种情况下,规范化输出是必须要做的任务,以确保应用能够顺利进行后续的逻辑处理。
语言模型返回的内容通常都是文本字符串,而实际 AI 应用开发过程中有时希望模型可以返回更直观、更格式化的内容,LangChain 提供了输出解析器(Output Parser)将模型输出解析为结构化数据。
有多种类型的输出解析器,常用的有 StrOutputParser(字符串解析器)与 JsonOutputParser(JSON解析器)。
2.4.1 StrOutputParser
StrOutputParser 是一个简单的解析器,从结果中提取 content 字段。
举例:将模型输出结果解析为字符串- import os
- from langchain.chat_models import init_chat_model
- from langchain_core.output_parsers import StrOutputParser
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- messages = [
- {"role": "system", "content": "你是一个机器人"},
- {"role": "user", "content": "你好"},
- ]
- resp = llm.invoke(messages)
- print(resp)
- str_resp = StrOutputParser().invoke(resp)
- print(str_resp)
复制代码
2.4.2 JsonOutputParser
JSON 解析器用于将大模型的自由文本输出转换为结构化JSON数据的工具。特别适用于需要严格结构化输出的场景,比如 API 调用、数据存储或下游任务处理。
JsonOutputParser 能够结合 Pydantic 模型进行数据验证,自动验证字段类型和内容(如字符串、数字、嵌套对象等)- import os
- from pydantic import BaseModel, Field
- from langchain.chat_models import init_chat_model
- from langchain_core.output_parsers import JsonOutputParser
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- class Prime(BaseModel):
- prime: list[int] = Field(description="素数")
- count: list[int] = Field(description="小于该素数的素数个数")
- json_parser = JsonOutputParser(pydantic_object=Prime)
- messages = [
- {"role": "system", "content": json_parser.get_format_instructions()},
- {
- "role": "user",
- "content": "任意生成5个1000-100000之间素数,并标出小于该素数的素数个数",
- },
- ]
- resp = llm.invoke(messages)
- json_resp = json_parser.invoke(resp)
- print(json_resp)
- # {'prime': [1009, 2003, 3001, 4001, 5003], 'count': [168, 303, 430, 584, 669]}
复制代码
2.5 Structured Outputs
可以要求模型按照给定的模式格式提供其响应,这有助于确保输出可以被轻松解析并在后续处理中使用。LangChain 支持多种模式类型和强制结构化输出的方法。
三、Chains
3.1 Runnable与LCEL
Runnable 是 LangChain 中可以调用、批处理、流式传输、转换和组合的工作单元。
Runnable 接口是使用 LangChain 组件的基础,它在许多组件中实现,例如语言模型、输出解析器、检索器、编译的 LangGraph 图等。
Runnable统一调用方式:- # 分步调用
- prompt_text = prompt.invoke({"topic": "猫"}) # 方法1
- model_out = model.invoke(prompt_text) # 方法2
- result = parser.invoke(model_out) # 方法3
- # LCEL管道式
- chain = prompt | model | parser # 用管道符组合
- result = chain.invoke({"topic": "猫"}) # 所有组件统一用invoke
复制代码 LangChain 表达式语言(LCEL,LangChain Expression Language)是一种从现有的Runnable 构建新的 Runnable 的声明式方法,用于声明、组合和执行各种组件(模型、提示、工具、函数等)。
我们称使用 LCEL 创建的 Runnable 为“链”,“链”本身就是 Runnable。
LCEL 两个主要的组合原语是 RunnableSequence 和 RunnableParallel。许多其他组合原语可以被认为是这两个原语的变体。
3.2 RunnableSequence
RunnableSequence 按顺序“链接”多个可运行对象,其中一个对象的输出作为下一个对象的输入。
LCEL重载了 | 运算符,以便从两个 Runnables 创建 RunnableSequence。- chain = runnable1 | runnable2
- # 等同于
- chain = RunnableSequence([runnable1, runnable2])
复制代码 举例:提示模板➡️模型➡️输出解析器- import os
- from langchain.chat_models import init_chat_model
- from langchain_core.prompts import PromptTemplate
- from langchain_core.output_parsers import StrOutputParser
- prompt_template = PromptTemplate(
- template="讲一个关于{topic}的笑话",
- input_variables=["topic"],
- )
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- parser = StrOutputParser()
- chain = prompt_template | llm | parser
- resp = chain.invoke({"topic": "人工智能"})
- print(resp)
复制代码 3.3 RunnableParallel
RunnableParallel 同时运行多个可运行对象,并为每个对象提供相同的输入。
对于同步执行,RunnableParallel 使用 ThreadPoolExecutor 来同时运行可运行对象。
对于异步执行,RunnableParallel 使用 asyncio.gather 来同时运行可运行对象。
在 LCEL 表达式中,字典会自动转换为 RunnableParallel。- import os
- from langchain.chat_models import init_chat_model
- from langchain_core.prompts import PromptTemplate
- from langchain_core.runnables import RunnableParallel
- from langchain_core.output_parsers import StrOutputParser
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- joke_chain = (
- PromptTemplate.from_template("讲一个关于{topic}的笑话") | llm | StrOutputParser()
- )
- poem_chain = (
- PromptTemplate.from_template("写一首关于{topic}的诗歌") | llm | StrOutputParser()
- )
- map_chain = RunnableParallel(joke=joke_chain, poem=poem_chain)
- resp = map_chain.invoke({"topic": "人工智能"})
- print(resp)
复制代码 3.4 RunnableLambda
RunnableLambda 将 Python 可调用函数转换为 Runnable,使得函数可以在同步或异步上下文中使用。- from langchain_core.runnables import RunnableLambda
- @RunnableLambda
- def total_len(x):
- return len(x["text1"]) + len(x["text2"])
- chain = {
- "text1": lambda x: x + " world",
- "text2": lambda x: x + ", how are you",
- } | total_len
- result = chain.invoke("hello")
- print(result) # 29
复制代码 3.5 RunnablePassthrough
RunnablePassthrough 接收输入并将其原样输出。RunnablePassthrough 是 LangChain LCEL 体系中的“无操作节点”,用于在流水线中透传输入或保留上下文,也可以用于向输出中添加键。
举例:保留中间结果- from langchain_core.runnables import RunnablePassthrough, RunnableParallel
- chain = RunnableParallel(
- original=RunnablePassthrough(), # 保留中间结果
- word_count=lambda x: len(x),
- )
- result = chain.invoke("hello world")
- print(result) # {'original': 'hello world', 'word_count': 11}
复制代码 举例:使用 assign() 向输出中添加键- from langchain_core.runnables import RunnablePassthrough
- chain = {
- "text1": lambda x: x + " world",
- "text2": lambda x: x + ", how are you",
- } | RunnablePassthrough.assign(word_count=lambda x: len(x["text1"] + x["text2"]))
- result = chain.invoke("hello")
- print(result)
- # {'text1': 'hello world', 'text2': 'hello, how are you', 'word_count': 29
复制代码 3.6 RunnableBranch
RunnableBranch 使用 (条件,Runnable) 对列表和默认分支进行初始化。对输入进行操作时,选择第一个计算结果为 True 的条件,并在输入上运行相应的 Runnable。如果没有条件为 True,则在输入上运行默认分支。- from langchain_core.runnables import RunnableBranch
- branch = RunnableBranch(
- (lambda x: isinstance(x, str), lambda x: x.upper()),
- (lambda x: isinstance(x, int), lambda x: x + 1),
- (lambda x: isinstance(x, float), lambda x: x * 2),
- lambda x: "goodbye",
- )
- result = branch.invoke("hello")
- print(result) # HELLO
- result = branch.invoke(None)
- print(result) # goodbye
复制代码 3.7 RunnableWithFallbacks
RunnableWithFallbacks 使得 Runnable 失败后可以回退到其他 Runnable。可以直接在Runnable 上使用 with_fallbacks 方法。- import os
- from langchain.chat_models import init_chat_model
- from langchain_core.prompts import PromptTemplate
- from langchain_core.runnables import RunnableLambda
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- chain = PromptTemplate.from_template("hello") | llm
- chain_with_fallback = chain.with_fallbacks([RunnableLambda(lambda x: "sorry")])
- result = chain_with_fallback.invoke("1") # 提示词模板中没有需要填充的变量,会报错
- print(result) # sorry
复制代码 四、RAG
4.1 RAG介绍
4.1.1 大模型的局限
知识滞后:LLM 因其具有海量参数,需要花费相当的物力与时间成本进行预训练和微调,同时商用 LLM 还需要进行各种安全测试与风险评估等。因此 LLM 会存在知识滞后的问题。知识缺失:在专有领域,LLM 无法学习到所有的专业知识细节,因此在面向专业领域知识的提问时,无法给出可靠准确的回答。
幻觉:LLM 在生成回答时,可能会“胡言乱语”,这种现象称之为 LLM 的“幻觉”。“幻觉”可以体现为错误陈述、编造事实、错误的复杂推理或者复杂语境下理解能力不足等。
“幻觉”产生的原因:训练知识存在偏差,这些错误信息被 LLM 学习后在输出中复现。
4.1.2 什么是RAG
为了改善大模型在时效性、可靠性与准确性方面的不足,各种针对 LLM 优化的方法应运而生。RAG(Retrieval-Augmented Generation,检索增强生成)就是其中一种被广泛研究和应用的优化架构。
RAG 的基本思想为:将传统的生成式大模型和实时信息检索技术相结合,为大模型补充来自外部的相关数据和上下文,来帮助大模型生成更加准确可靠的内容。这使得大模型在生成内容时可以依赖实时与个性化的数据和知识,而非仅仅依赖训练知识。就相当于在大模型回答时给它一本参考书。
可以说,当应用需求集中在利用大模型去回答特定私有领域的知识,且知识库足够大时,那么除了微调大模型外,RAG 就是非常有效的一种解决方案。LangChain 对这一流程提供了解决方案。
4.1.3 RAG优缺点
RAG的优点:
相比提示词工程,RAG 有更丰富的上下文和数据样本,可以不需要用户提供过多的背景描述,就能生成比较符合用户预期的答案。相比于模型微调,RAG 可以提升问答内容的时效性和可靠性。在一定程度上保护了业务数据的隐私性。
RAG的缺点:
由于每次问答都涉及外部系统数据检索,因此 RAG 的响应时延相对较高。引用的外部知识数据会消耗大量的模型 Token 资源。
4.1.4 RAG流程
典型的RAG有两个主要流程:
索引:从数据源提取数据,构建索引。
检索生成:接受用户查询并从索引中检索相关数据,然后将其传递给模型。
索引阶段:
从各种数据源加载数据➡️
将文档切分为小块➡️
对文本块进行嵌入➡️
存储嵌入向量。
检索生成阶段:
根据用户输入,使用检索器从存储中检索相关文本块➡️
大模型使用包含问题和检索结果的提示生成回答。
4.2 文档加载
数据源可能包含多种格式的文件,如文本文档、Markdown,PDF 等。因此我们首先需要对各种格式的文件进行处理。LangChain 实现和集成了众多文档加载器,方便从不同格式的文件中加载数据。可在 https://docs.langchain.com/oss/python/integrations/document_loaders 查看所有集成的文档加载器。
LangChain 所有文档加载器都实现了 BaseLoader 接口,接口提供了通用的 load(一次加载所有文档) 与 lazy_load(以延迟方式加载文档) 方法,用于从数据源加载数据并处理为 Document 对象。
LangChain 实现了 Document 抽象,用于表示文本单元及其元数据,它包含三个属性:
page_content:文本内容字符串。metadata:包含元数据的字典,如文档的来源等。id:可选,文档标识符。
4.2.1 加载 TXT
- from langchain_community.document_loaders import TextLoader
- docs = TextLoader(
- file_path="assets/sample.txt", # 文件路径
- encoding="utf-8", # 文件编码方式
- ).load() # 返回List[Document]
- print(docs)
- # [Document(metadata={'source': 'asset/sample.txt'}, page_content='...')]
复制代码 4.2.2 加载 CSV
- # pip install langchain_community
- from langchain_community.document_loaders.csv_loader import CSVLoader
- # 加载所有列
- docs = CSVLoader(
- file_path="assets/sample.csv", # 文件路径
- ).load() # 返回List[Document]
- print(docs)
- # 加载部分列
- docs = CSVLoader(
- file_path="assets/sample.csv", # 文件路径
- metadata_columns=["title", "author"], # 将指定列作为元数据
- content_columns=["content"], # 将指定列作为内容
- ).load() # 返回List[Document]
- print(docs)
复制代码 4.2.3 加载 JSON
LangChain 实现了 JSONLoader,用来将 JSON 和 JSONL 数据转换为 LangChain 文档对象。它使用指定的 jq 模式来解析 JSON 文件,从而将特定字段提取到 LangChain 文档的内容和元数据中。
如果要从 JSON Lines 文件加载文档,需传递 json_lines=True。
详细用法可参考 https://jqlang.org/manual/#basic-filters
举例:提取所有字段- from langchain_community.document_loaders import JSONLoader
- # 提取所有字段
- docs = JSONLoader(
- file_path="assets/sample.json", # 文件路径
- jq_schema=".", # 提取所有字段
- text_content=False, # 提取内容是否为字符串格式
- ).load()
- print(docs)
复制代码 举例:提取指定字段中的内容- from langchain_community.document_loaders import JSONLoader
- # 提取所有字段
- # docs = JSONLoader(
- # file_path="assets/sample.json", # 文件路径
- # jq_schema=".", # 提取所有字段
- # text_content=False, # 提取内容是否为字符串格式
- # ).load()
- # print(docs)
- # ===========================================
- # 提取指定字段中的内容
- docs = JSONLoader(
- file_path="assets/sample.json", # 文件路径
- jq_schema=".data.items[]", # 提取data.items中的数据
- text_content=False, # 提取内容是否为字符串格式
- ).load()
- print(docs)
- docs = JSONLoader(
- file_path="assets/sample.json", # 文件路径
- jq_schema=".data.items[].content", # 提取data.items[].content中的数据
- ).load()
- print(docs)
- docs = JSONLoader(
- file_path="assets/sample.json", # 文件路径
- jq_schema="""
- .data.items[] | {
- author,
- created_at,
- content: (.title + "\n" + .content)
- }
- """, # 提取data.items中指定字段的数据
- text_content=False, # 提取内容是否为字符串格式
- ).load()
- print(docs)
复制代码 4.2.4 加载 HTML 网页
- import bs4
- from langchain_community.document_loaders import WebBaseLoader
- docs = WebBaseLoader(
- # 网址序列
- web_paths=("https://baike.baidu.com/item/%E5%BE%AE%E6%B3%A2%E7%82%89/84186",),
- # 传给 BeautifulSoup 的解析参数,parse_only 表示只提取指定标签的元素
- bs_kwargs={"parse_only": bs4.SoupStrainer(class_="J-lemma-content")},
- ).load()
- print(docs)
复制代码 4.2.5 加载 Markdown
可使用 UnstructuredMarkdownLoader 加载 Markdown 文件,需要 unstructured 包。- from langchain_community.document_loaders import UnstructuredMarkdownLoader
- docs = UnstructuredMarkdownLoader(
- # 文件路径
- file_path="assets/sample.md",
- # 加载模式:
- # single 返回单个Document对象
- # elements 按标题等元素切分文档
- mode="elements",
- ).load()
- print(docs)
复制代码 4.2.6 加载 Doc/Docx
- from langchain_community.document_loaders import UnstructuredWordDocumentLoader
- docs = UnstructuredWordDocumentLoader(
- # 文件路径
- file_path="assets/sample.docx",
- # 加载模式:
- # single 返回单个Document对象
- # elements 按标题等元素切分文档
- mode="single",
- ).load()
- print(docs)
复制代码 4.2.7 加载 PDF
PDF 存在多种来源格式,包括扫描版(图片 PDF)、电子文本版、混合版。并且布局格式也多种多样,包括单列布局、双列布局甚至竖排文本布局。并且包含段落、标题、页眉页脚、表格、数学公式、化学式、特殊符号、图片等各种元素。
因此,PDF 解析存在很多挑战。对于复杂 PDF,需要进行文本提取、布局检测、表格解析、公式识别等处理。
UnstructuredPDFLoader是对 unstructured 库的封装。支持布局识别与 OCR 提取文字。
使用UnstructuredPDFLoader,需要先下载 Poppler 和 Tesseract OCR。
Poppler 是一个开源的 PDF 文档处理库,用于渲染、解析和操作 PDF 文件。下载解压后将 .../poppler-24.08.0/Library/bin 添加到环境变量 Path 中即可。
Tesseract OCR 用于提取图片中的文字,在安装时需要选择 Additional language data(download) 来添加中文语言包。
MinerU 提供了 PDF、Word、PPT、图片等文件的解析,支持图像提取、OCR、公式、表格解析等功能。
调用在线服务:https://mineru.net/apiManage/docs。可以从本地批量上传文件进行解析,并接收解析结果。
- from langchain_community.document_loaders import UnstructuredPDFLoader
- docs = UnstructuredPDFLoader(
- file_path="assets/sample.pdf", # 文件路径
- # 加载模式:
- # single: 返回单个Document对象
- # elements: 按标题等元素切分文档
- mode="elements",
- # 加载策略:
- # fast: pdfminer 提取并处理文本
- # ocr_only: 转换为图片并进行 OCR
- # hi_res: 识别文档布局,将OCR 输出与 pdfminer 输出融合
- strategy="hi_res",
- # 推断表格结构:仅 hi_res 下起效,如果为 True 则会在表格元素的元数据中添加 text_as_html
- infer_table_structure=True,
- # OCR 使用的语言: eng 英文,chi_sim 中文简体。语言列表参考 https://github.com/tesseract-ocr/langdata
- languages=["eng", "chi_sim"],
- # 更多参数详见 https://github.com/Unstructured-IO/unstructured/blob/main/unstructured/partition/pdf.py
- ).load()
- print(docs)
复制代码 4.3 文档切分
4.3.1 为什么切分
获取 Document 对象后,需要将其切分成 Chunk。之所以要进行切分是出于以下考虑:
后续需要根据提问检索出相关的内容放入 Prompt,如果答案出现在某一个 Document 对象中,那么将检索到的整个 Document 对象直接放入 Prompt 中并不是最优的选择,因为 Document 可能包含非常多无关的信息,这些无效信息会干扰大模型的生成。
有研究发现,尽管大模型能够处理长文本输入,但它们在利用长上下文方面存在显著不足。尤其是在多文档问答和键值检索等任务中,当相关信息位于输入文本的中间时,模型的性能显著下降。这种现象表明,当前的语言模型在长输入上下文中未能充分利用信息,尤其是位于中间部分的信息。
大模型存在最大输入的 Token 限制,如果一个 Document 非常大,在输入大模型时会被截断,导致信息缺失。
基于此,一个方法是将完整的 Document 进行分块处理(Chunking),将 Document 切分为一个个小块(Chunk)。无论是在存储还是检索过程中,都将以这些块为基本单位,这样能有效地避免内容噪声干扰和超出最大 Token 的问题。
4.3.2 切分策略
按照固定字符数或 Token 数来切分,但可能会在不适当的位置切断句子。递归使用多个分隔符切分,同时尽量保证字符数或 Token 数不超出限制。能保证不切断完整的句子。语义切分:根据文本的语义内容切分,旨在保持相关信息的集中和完整,适用于需要高度语义保持的场景。但处理速度较慢,且可能出现不同块之间长度极不均衡的情况。具体切分过程为:将相邻的几个句子拼成一个句组。对所有句组进行嵌入,并比较嵌入向量的距离,找到语义变化大的位置,根据阈值确定切分点(比如计算相邻句子嵌入向量的余弦距离,取距离分布的第 N 百分位值作为阈值,高于此值则切分)。按照切分点切分出若干个语义段,并合并某些长度很短的语义段。
4.3.3 RecursiveCharacterTextSplitter
RecursiveCharacterTextSplitte(递归字符文本切分器)是最常用的切分器,它由一个字符列表作为参数,默认列表为 ["\n\n", "\n", " ", ""],并且会尝试按顺序使用这些字符进行切分,直到块足够小。由此尽可能地将所有段落(然后是句子,最后是词)保持在一起,因为这些段落通常看起来是语义上最相关的文本片段。
同时为了保证段之间语义完整,可以设置每个块之间有一部分重叠。
4.4 文档嵌入
使用嵌入模型生成文档的嵌入向量,后续检索时用于与查询的嵌入向量进行相似度计算。
2018年谷歌推出的 BERT 能够将文本嵌入为简单的向量表示,但是 BERT 并未针对有效生成句子嵌入进行优化,由此促使了 Sentence-BERT 的诞生。Sentence-BERT 调整了 BERT 的架构以及预训练任务以生成包含语义的句子嵌入向量,这些嵌入向量可以通过余弦相似度等相似性指标轻松进行比较,大大降低了查找相似句子等任务的计算开销。
常用嵌入模型:
模型
| 机构
| 描述
| bge-large-zh
| 北京智源研究院(BAAI)
| 开源,向量维度1024,序列长度512
| bge-base-zh
| BAAI
| 开源,向量维度768,序列长度512
| bge-small-zh
| BAAI
| 开源,向量维度512,序列长度512
| bge-m3
| BAAI
| 开源,多语言,向量维度1024,序列长度8192
| text-embedding-3-small
| OpenAI
| 多语言,向量维度1536,序列长度8192
| text-embedding-3-large
| OpenAI
| 多语言,向量维度3072,序列长度8192
| 4.5 向量存储
4.5.1 常用的向量数据库
LangChain提供了众多向量存储的集成,包括开源的本地向量存储与云托管的私有向量存储。并公开了一个标准接口,可以轻松地在向量存储之间进行交换。
常用向量数据库:
向量数据库
| 描述
| FAISS
| 一个用于高效相似性搜索和密集向量聚类的库
| Chroma
| 开源的轻量级向量数据库,有极简的 API
| Milvus
| 开源的专为向量搜索设计的云原生数据库。性能强悍,功能丰富。覆盖轻量级的原型开发到十亿级向量的大规模生产系统
| Pgvector
| 开源关系型数据库 PostgreSQL 的扩展,为PostgreSQL增加了向量数据类型和相似性搜索功能
| Redis
| 开源内存数据结构存储,现已原生支持向量相似性搜索功能
| Elasticsearch
| 开源分布式搜索和分析引擎,提供了一个基于文档的数据库,结构化、非结构化和向量数据通过高效的列式存储统一管理
| 4.5.2 Milvus 介绍
Milvus 通过 数据库—Collections—实体 的结构管理数据。Collections 和实体就类似关系型数据库中的表和记录。具体来说,Collection 是一个二维表,具有固定的列和变化的行。每列代表一个字段,每行代表一个实体。
Collection 通过 Collection Schema 来定义有哪些字段以及字段的类型、索引等。一个 Collection Schema 有一个主键、最多四个向量字段和若干标量字段。
主键用于唯一标识一个实体,只接受 Int64 或 VarChar 值。插入实体时,默认情况下应包含主键值。但是,如果在创建 Collections 时启用了 AutoId,Milvus 将在插入数据时生成主键值,此时插入的实体中不应包含主键值。
向量字段用于存储文本、图像和音频等非结构化数据类型的嵌入,可以是密集向量、稀疏向量或二进制向量。通常,密集向量用于语义搜索,而稀疏向量则更适合全文或词性匹配。
标量字段通常用来存储一些元数据,并可以在搜索时通过元数据进行过滤,以提高搜索结果的正确性。
字段类型
| 字段
| 描述
| 向量字段
| 密集向量
| FLOAT_VECTOR
| 32位浮点数列表
| | | FLOAT16_VECTOR
| 16位半精度浮点数列表
| | | BFLOAT16_VECTOR
| 16位浮点数列表,精度稍低,但指数范围与 Float32 相同
| | | INT8_VECTOR
| 8位有符号整数向量
| | 稀疏向量
| SPARSE_FLOAT_VECTOR
| 非零数字及其序列号列表
| | 二进制向量
| BINARY_VECTOR
| 一个0和1的列表
| 标量字段
| VARCHAR
| 字符串
| | BOOL
| 存储true或false
| | INT
| INT8、INT16、INT32、INT64
| | FLOAT
| 32位浮点数
| | DOUBLE
| 64位双精度浮点数
| | ARRAY
| 相同数据类型元素的有序集合
| | JSON
| 结构化的键值数据
| 索引是建立在数据之上的附加结构,可以加快搜索速度。不同字段数据类型适用不同的索引类型。比如 FLOAT_VECTOR 可使用 HNSW(分层导航小世界)索引,VARCHAR 可使用 INVERTED(反转)索引。详见https://milvus.io/docs/index-explained.md。
Milvus 提供了多个版本以在不同场景下选择合适的使用方式:
Milvus Lite:本地轻量化运行,通过 pip installpymilvus[milvus-lite] 即可安装。但 Milvus Lite 有一些限制,比如 Milvus Lite 仅支持 FLAT 索引类型。无论在 Collections 中指定了哪种索引类型,它都使用 FLAT 类型。Milvus Standalone:单点部署,支持通过 Docker 部署。Milvus Distributed:分布式部署,支持在 Kubernetes 集群上部署。
4.5.3 Milvus安装
有两个坑,Milvus不支持windows,所以windows系统使用本地安装运行不太好使。另外在docker上常规pull安装也不太好使,不知道是为什么,我的最终安装方案如下(使用官方安装脚本):
# 1. 清理旧容器docker rm -f milvus-standalone# 2. 下载官方脚本curl -sfL https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh -o standalone_embed.shchmod +x standalone_embed.sh# 3. 启动 Milvus./standalone_embed.sh start# 4. 查看状态./standalone_embed.sh status# 5. 查看日志tail -20 milvus.log# 6. 检查端口netstat -tulpn | grep -E "19530|9091"测试windows本地连接:- # test_connection.py
- import socket
- import time
- # 测试 TCP 连接
- host = "192.168.10.160"
- port = 19530
- print(f"测试连接到 {host}:{port}...")
- try:
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(5)
- result = sock.connect_ex((host, port))
- if result == 0:
- print(f"✓ 端口 {port} 可以连接")
- else:
- print(f"✗ 端口 {port} 无法连接,错误码: {result}")
- sock.close()
- except Exception as e:
- print(f"✗ 连接失败: {e}")
- # 测试 HTTP 连接
- import requests
- try:
- response = requests.get(f"http://{host}:9091/healthz", timeout=5)
- print(f"✓ 健康检查响应: {response.status_code} - {response.text}")
- except Exception as e:
- print(f"✗ 健康检查失败: {e}")
复制代码- 测试连接到 192.168.10.160:19530...
- ✓ 端口 19530 可以连接
- ✓ 健康检查响应: 200 - OK
复制代码 4.5.3 创建 Collection
- from pprint import pprint
- from pymilvus import MilvusClient, DataType
- # 实例化向量数据库客户端
- client = MilvusClient(
- uri="http://192.168.10.160:19530", # 数据存储在本地当前目录下
- token="root:Milvus"
- )
- # 创建 schema
- def build_schema():
- return (
- MilvusClient.create_schema(
- # 自动分配主键
- auto_id=True,
- # 启用动态字段,未在 Schema 中声明的字段会以键值对的形式存储在这个动态字段
- enable_dynamic_field=True,
- )
- # 添加 id 字段,类型为整数,设置为主键
- .add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
- # 添加 vector 字段,类型为浮点数向量,维度为 768
- .add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=768)
- # 添加 text 字段,类型为字符串,最大长度为 1024
- .add_field(field_name="text", datatype=DataType.VARCHAR, max_length=1024)
- # 添加 metadata 字段,类型为 JSON
- .add_field(field_name="metadata", datatype=DataType.JSON)
- )
- # 创建 index
- def build_index():
- index_params = MilvusClient.prepare_index_params()
- index_params.add_index(
- field_name="vector", # 建立索引的字段
- index_type="AUTOINDEX", # 索引类型
- metric_type="L2", # 向量相似度度量方式
- )
- return index_params
- # 创建 collection
- if client.has_collection(collection_name="demo_collection"):
- # 删除 collection
- # 在 Milvus 中删除数据后,存储空间不会立即释放。虽然删除数据会将实体标记为 "逻辑删除",但实际空间可能不会立即释放。
- # Milvus 会在后台自动压缩数据。这个过程会将较小的数据段合并为较大的数据段,并删除"逻辑删除"的数据或已超过有效时间的数据。
- # 一个名为 Garbage Collection (GC) 的独立进程会定期删除这些 "已删除 "的数据段,从而释放它们占用的存储空间。
- client.drop_collection(collection_name="demo_collection")
- client.create_collection(
- collection_name="demo_collection", # collection 名称
- schema=build_schema(), # collection 的 schema
- index_params=build_index(), # collection 的 index
- )
- # 查看 collection
- print(client.list_collections())
- # 查看 collection 描述
- pprint(client.describe_collection(collection_name="demo_collection"))
复制代码- ['demo_collection']
- {'aliases': [],
- 'auto_id': True,
- 'collection_id': 465315026390748425,
- 'collection_name': 'demo_collection',
- 'consistency_level': 2,
- 'created_timestamp': 465315145787375634,
- 'description': '',
- 'enable_dynamic_field': True,
- 'enable_namespace': False,
- 'fields': [{'auto_id': True,
- 'description': '',
- 'field_id': 100,
- 'is_primary': True,
- 'name': 'id',
- 'params': {},
- 'type': <DataType.INT64: 5>},
- {'description': '',
- 'field_id': 101,
- 'name': 'vector',
- 'params': {'dim': 768},
- 'type': <DataType.FLOAT_VECTOR: 101>},
- {'description': '',
- 'field_id': 102,
- 'name': 'text',
- 'params': {'max_length': 1024},
- 'type': <DataType.VARCHAR: 21>},
- {'description': '',
- 'field_id': 103,
- 'name': 'metadata',
- 'params': {},
- 'type': <DataType.JSON: 23>}],
- 'functions': [],
- 'num_partitions': 1,
- 'num_shards': 1,
- 'properties': {'timezone': 'UTC'},
- 'update_timestamp': 465315145787375634}
复制代码 4.5.4 操作实体
- import os
- from pymilvus import MilvusClient, DataType
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- from langchain_community.document_loaders import UnstructuredWordDocumentLoader
- from langchain_community.embeddings import DashScopeEmbeddings
- # 从环境变量获取 API Key
- api_key = os.getenv("aliQwen-api")
- if not api_key:
- print("错误: 请设置 DASHSCOPE_API_KEY 环境变量")
- print("Windows: set DASHSCOPE_API_KEY=sk-xxxxx")
- print("Linux: export DASHSCOPE_API_KEY=sk-xxxxx")
- exit(1)
- # 实例化向量数据库客户端
- client = MilvusClient(
- uri="http://192.168.10.160:19530",
- token="root:Milvus"
- )
- # 1. 删除旧的 collection
- collection_name = "demo_collection"
- if client.has_collection(collection_name):
- print(f"删除旧 collection: {collection_name}")
- client.drop_collection(collection_name)
- print("✓ 旧 collection 已删除")
- # 2. 加载文档
- print("\n正在加载文档...")
- docs = UnstructuredWordDocumentLoader(
- file_path="assets/sample.docx",
- mode="single",
- ).load()
- # 3. 文档切分
- print("正在切分文档...")
- chunks = RecursiveCharacterTextSplitter(
- separators=["\n\n", "\n", "。", "!", "?", "……", ",", ""],
- chunk_size=300,
- chunk_overlap=50,
- ).split_documents(docs)
- print(f"切分为 {len(chunks)} 个文本块")
- # 4. 加载 Qwen 嵌入模型(从环境变量读取 API Key)
- print("\n正在加载 Qwen 嵌入模型...")
- embed_model = DashScopeEmbeddings(
- model="text-embedding-v2",
- dashscope_api_key=api_key # 使用从环境变量获取的 API Key
- )
- # 5. 计算嵌入向量
- print("正在计算嵌入向量...")
- texts = [chunk.page_content for chunk in chunks]
- embeddings = embed_model.embed_documents(texts)
- # 获取向量维度
- vector_dim = len(embeddings[0])
- print(f"嵌入向量维度: {vector_dim}")
- # 6. 创建新的 collection
- print(f"\n创建新 collection: {collection_name}")
- schema = MilvusClient.create_schema(
- auto_id=True,
- enable_dynamic_field=True,
- )
- schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
- schema.add_field(field_name="vector", datatype=DataType.FLOAT_VECTOR, dim=vector_dim)
- schema.add_field(field_name="text", datatype=DataType.VARCHAR, max_length=65535)
- schema.add_field(field_name="metadata", datatype=DataType.JSON)
- index_params = MilvusClient.prepare_index_params()
- index_params.add_index(
- field_name="vector",
- index_type="AUTOINDEX",
- metric_type="L2",
- )
- client.create_collection(
- collection_name=collection_name,
- schema=schema,
- index_params=index_params,
- )
- print("✓ Collection 创建成功")
- # 7. 准备插入数据
- print("\n准备插入数据...")
- data = []
- for chunk, embedding in zip(chunks, embeddings):
- data.append({
- "vector": embedding,
- "text": chunk.page_content,
- "metadata": chunk.metadata,
- })
- # 8. 插入数据
- print("正在插入数据...")
- res = client.insert(collection_name=collection_name, data=data)
- print(f"✓ 插入成功!共 {len(res['ids'])} 条数据")
- print(f"插入的 ID: {res['ids']}")
- # 9. 验证插入
- print("\n验证数据...")
- results = client.get(collection_name=collection_name, ids=res['ids'][:3])
- for i, result in enumerate(results):
- print(f" {i+1}. {result['text'][:100]}...")
- print(f" 向量维度: {len(result['vector'])}")
- print("\n✓ 所有操作成功!")
复制代码- import os
- from pymilvus import MilvusClient
- from langchain_community.embeddings import DashScopeEmbeddings
- from pprint import pprint
- # 从环境变量获取 API Key
- api_key = os.getenv("aliQwen-api")
- if not api_key:
- print("错误: 请设置 DASHSCOPE_API_KEY 环境变量")
- print("Windows: set DASHSCOPE_API_KEY=sk-xxxxx")
- print("Linux: export DASHSCOPE_API_KEY=sk-xxxxx")
- exit(1)
- # 实例化向量数据库客户端
- client = MilvusClient(
- uri="http://192.168.10.160:19530",
- token="root:Milvus"
- )
- collection_name = "demo_collection"
- # 加载嵌入模型
- embed_model = DashScopeEmbeddings(
- model="text-embedding-v2",
- dashscope_api_key=api_key
- )
- # ========== 1. 基本查询 ==========
- print("=" * 50)
- print("1. 基本查询")
- print("=" * 50)
- # 1.1 查看所有 collections
- print("\n所有 collections:")
- print(client.list_collections())
- # 1.2 查看 collection 信息
- print(f"\nCollection '{collection_name}' 信息:")
- pprint(client.describe_collection(collection_name=collection_name))
- # 1.3 统计 collection 中的数据量(修正)
- try:
- # 方法1:使用 get_collection_stats
- stats = client.get_collection_stats(collection_name=collection_name)
- count = stats.get('row_count', 0)
- print(f"\n数据总量: {count} 条")
- except AttributeError:
- # 方法2:如果没有 get_collection_stats,尝试其他方式
- print("\n注意: 无法直接获取数据总量,尝试查询所有数据...")
- # 尝试获取所有数据
- try:
- # 使用 query 方法获取所有数据
- results = client.query(
- collection_name=collection_name,
- output_fields=["id"],
- limit=10000
- )
- count = len(results)
- print(f"数据总量: {count} 条")
- except Exception as e:
- print(f"无法获取数据总量: {e}")
- # ========== 2. 获取所有数据 ==========
- print("\n" + "=" * 50)
- print("2. 获取所有数据")
- print("=" * 50)
- # 使用 query 获取数据
- try:
- # 获取前10条数据
- results = client.query(
- collection_name=collection_name,
- output_fields=["id", "text", "metadata"],
- limit=10
- )
- print(f"\n前 {len(results)} 条数据:")
- for i, data in enumerate(results):
- print(f"\n数据 {i + 1}:")
- print(f" ID: {data['id']}")
- print(f" 文本: {data['text'][:100]}...")
- if 'metadata' in data:
- print(f" 元数据: {data['metadata']}")
- except Exception as e:
- print(f"获取数据失败: {e}")
- # 备用方法:尝试通过 ID 范围获取
- try:
- print("\n尝试通过 ID 范围获取数据...")
- all_data = client.get(
- collection_name=collection_name,
- ids=list(range(100))
- )
- print(f"获取到 {len(all_data)} 条数据")
- for i, data in enumerate(all_data[:3]):
- print(f"\n数据 {i + 1}:")
- print(f" ID: {data['id']}")
- print(f" 文本: {data['text'][:100]}...")
- except Exception as e2:
- print(f"备用方法也失败: {e2}")
- # ========== 3. 向量相似度搜索 ==========
- print("\n" + "=" * 50)
- print("3. 向量相似度搜索")
- print("=" * 50)
- # 3.1 单向量搜索
- query_text = "机器学习算法"
- print(f"\n查询文本: '{query_text}'")
- try:
- # 计算查询向量
- query_embedding = embed_model.embed_query(query_text)
- # 执行搜索
- search_results = client.search(
- collection_name=collection_name,
- data=[query_embedding],
- limit=5, # 返回前5个最相似的结果
- output_fields=["text", "metadata"], # 返回的字段
- search_params={"metric_type": "L2", "params": {"nprobe": 10}}
- )
- print("\n搜索结果:")
- for i, hits in enumerate(search_results):
- for j, hit in enumerate(hits):
- print(f"\n 结果 {j + 1}:")
- print(f" 相似度距离: {hit['distance']:.4f}")
- print(f" ID: {hit['id']}")
- print(f" 文本: {hit['entity']['text'][:150]}...")
- if 'metadata' in hit['entity']:
- print(f" 元数据: {hit['entity']['metadata']}")
- except Exception as e:
- print(f"搜索失败: {e}")
- # 3.2 批量搜索
- print("\n" + "=" * 50)
- print("4. 批量搜索")
- print("=" * 50)
- query_texts = ["人工智能", "深度学习", "自然语言处理"]
- print(f"\n批量查询文本: {query_texts}")
- try:
- # 批量计算向量
- query_embeddings = embed_model.embed_documents(query_texts)
- # 批量搜索
- batch_results = client.search(
- collection_name=collection_name,
- data=query_embeddings,
- limit=3,
- output_fields=["text"]
- )
- for i, results in enumerate(batch_results):
- print(f"\n查询 '{query_texts[i]}' 的结果:")
- for j, hit in enumerate(results):
- print(f" {j + 1}. {hit['entity']['text'][:80]}... (距离: {hit['distance']:.4f})")
- except Exception as e:
- print(f"批量搜索失败: {e}")
- # ========== 4. 分页查询 ==========
- print("\n" + "=" * 50)
- print("5. 分页查询")
- print("=" * 50)
- try:
- # 使用 query 进行分页
- page_size = 5
- offset = 0
- print(f"\n分页查询(每页 {page_size} 条):")
- for page in range(3): # 只显示前3页
- results = client.query(
- collection_name=collection_name,
- output_fields=["id", "text"],
- limit=page_size,
- offset=offset
- )
- if not results:
- break
- print(f"\n第 {page + 1} 页:")
- for data in results:
- print(f" ID {data['id']}: {data['text'][:60]}...")
- offset += page_size
- except Exception as e:
- print(f"分页查询失败: {e}")
- # ========== 5. 高级搜索 ==========
- print("\n" + "=" * 50)
- print("6. 高级搜索 - 按特定字段过滤")
- print("=" * 50)
- if 'results' in locals() and len(search_results) > 0:
- try:
- # 使用第一个查询结果作为示例
- if len(search_results[0]) > 0:
- sample_id = search_results[0][0]['id']
- print(f"\n根据搜索结果,ID {sample_id} 的详细信息:")
- # 获取特定 ID 的详细信息
- result = client.get(
- collection_name=collection_name,
- ids=[sample_id]
- )
- if result:
- print(f" 完整文本: {result[0]['text']}")
- print(f" 向量前10个值: {result[0]['vector'][:10]}...")
- except Exception as e:
- print(f"获取详细信息失败: {e}")
- print("\n✓ 查询操作完成!")
复制代码- import os
- from pymilvus import MilvusClient
- from pprint import pprint
- # 实例化向量数据库客户端
- client = MilvusClient(
- uri="http://192.168.10.160:19530",
- token="root:Milvus"
- )
- collection_name = "demo_collection"
- print("=" * 50)
- print("删除操作示例")
- print("=" * 50)
- # 1. 查看当前数据量
- print("\n1. 查看数据统计:")
- try:
- stats = client.get_collection_stats(collection_name=collection_name)
- count_before = stats.get('row_count', 0)
- print(f"删除前数据量: {count_before}")
- except Exception as e:
- print(f"无法获取数据量: {e}")
- count_before = 0
- # 2. 获取前几条数据
- print("\n2. 获取前几条数据:")
- try:
- results = client.query(
- collection_name=collection_name,
- output_fields=["id", "text"],
- limit=5
- )
- print("前几条数据:")
- for data in results:
- print(f" ID: {data['id']}, 文本: {data['text'][:50]}...")
- except Exception as e:
- print(f"获取数据失败: {e}")
- results = []
- # ========== 1. 删除单个实体 ==========
- print("\n" + "=" * 50)
- print("3. 删除单个实体")
- print("=" * 50)
- if results:
- delete_id = results[0]['id']
- print(f"\n删除 ID: {delete_id}")
- try:
- # 执行删除
- delete_result = client.delete(
- collection_name=collection_name,
- ids=[delete_id]
- )
- print(f"删除结果: {delete_result}")
- # 验证删除
- try:
- stats = client.get_collection_stats(collection_name=collection_name)
- count_after = stats.get('row_count', 0)
- print(f"删除后数据量: {count_after}")
- except:
- pass
- except Exception as e:
- print(f"删除失败: {e}")
- # ========== 2. 删除多个实体 ==========
- print("\n" + "=" * 50)
- print("4. 删除多个实体")
- print("=" * 50)
- # 重新获取数据
- try:
- results = client.query(
- collection_name=collection_name,
- output_fields=["id"],
- limit=3
- )
- if len(results) >= 2:
- delete_ids = [results[0]['id'], results[1]['id']]
- print(f"\n删除 IDs: {delete_ids}")
- # 执行批量删除
- delete_result = client.delete(
- collection_name=collection_name,
- ids=delete_ids
- )
- print(f"删除结果: {delete_result}")
- except Exception as e:
- print(f"批量删除失败: {e}")
- # ========== 3. 清空整个 collection(谨慎) ==========
- print("\n" + "=" * 50)
- print("5. 清空整个 collection")
- print("=" * 50)
- confirm = input("\n⚠️ 是否要清空整个 collection?(y/n): ")
- if confirm.lower() == 'y':
- try:
- # 获取所有 ID
- results = client.query(
- collection_name=collection_name,
- output_fields=["id"],
- limit=10000
- )
- all_ids = [r['id'] for r in results]
- print(f"找到 {len(all_ids)} 条数据")
- # 批量删除
- batch_size = 100
- deleted_total = 0
- for i in range(0, len(all_ids), batch_size):
- batch_ids = all_ids[i:i + batch_size]
- result = client.delete(collection_name=collection_name, ids=batch_ids)
- deleted_total += len(batch_ids)
- print(f"已删除 {deleted_total}/{len(all_ids)} 条")
- print(f"\n✓ 清空完成!共删除 {deleted_total} 条数据")
- # 验证
- try:
- stats = client.get_collection_stats(collection_name=collection_name)
- final_count = stats.get('row_count', 0)
- print(f"最终数据量: {final_count}")
- except:
- pass
- except Exception as e:
- print(f"清空失败: {e}")
- else:
- print("取消清空操作")
- # ========== 4. 删除整个 collection ==========
- print("\n" + "=" * 50)
- print("6. 删除整个 collection")
- print("=" * 50)
- confirm = input("\n⚠️ 是否要删除整个 collection(包括结构和数据)?(y/n): ")
- if confirm.lower() == 'y':
- try:
- client.drop_collection(collection_name=collection_name)
- print(f"✓ Collection '{collection_name}' 已删除")
- # 验证
- collections = client.list_collections()
- print(f"现有 collections: {collections}")
- except Exception as e:
- print(f"删除失败: {e}")
- else:
- print("取消删除操作")
- print("\n✓ 删除操作完成!")
复制代码 4.6 检索与生成
检索阶段:用户输入查询➡️计算嵌入向量➡️在向量存储中检索相似向量➡️返回相似向量对应的内容。
向量相似性搜索算法:
ANN(近似近邻)和 KNN(最近邻)搜索是向量相似性搜索的常用方法。
在 KNN 搜索中,必须将向量空间中的所有向量与搜索请求中携带的查询向量进行比较,然后找出最相似的向量,费时费力。
而 ANN 搜索算法要求提供一个索引文件,记录向量 Embeddings 的排序顺序。当收到搜索请求时,使用索引文件作为参考,找到可能包含与查询向量最相似的向量嵌入的子组,根据指定的度量类型来测量查询向量与子组中的向量之间的相似度,根据与查询向量的相似度对组成员进行排序,并返回前 K 个成员。不过 ANN 搜索依赖于预建索引,搜索吞吐量、内存使用量和搜索正确性可能会因选择的索引类型而不同。
HNSW (分层导航小世界)是当下常用的一种基于图的索引算法,可以提高搜索高维浮点数向量时的性能。它具有出色的搜索精度和低延迟,但需要较高的内存开销来维护其分层图结构。该算法构建了一个多层图(类似不同缩放级别的地图),底层包含所有数据点,而上层则由从底层采样的数据点子集组成。在这种层次结构中,每一层都包含代表数据点的节点,节点之间由表示其接近程度的边连接。上层提供远距离跳转,以快速接近目标,而下层则进行细粒度搜索,以获得最准确的结果。其工作原理如下:
入口点:搜索从顶层的一个固定入口点开始,该入口点是图中的一个预定节点。贪婪搜索:算法贪婪地移动到当前层的近邻,直到无法再接近查询向量为止。上层起到导航作用,作为粗过滤器,为下层的精细搜索找到潜在的入口点。层层下降:一旦当前层达到局部最小值,算法就会利用预先建立的连接跳转到下层,并重复贪婪搜索。最后细化:这一过程一直持续到最底层,在最底层进行最后的细化步骤,找出最近的邻居。
示例:- import os
- from pymilvus import MilvusClient
- from langchain_community.embeddings import DashScopeEmbeddings
- from langchain_community.llms import Tongyi
- from pprint import pprint
- # ========== 配置部分 ==========
- # 从环境变量获取 API Key
- api_key = os.getenv("aliQwen-api")
- if not api_key:
- print("错误: 请设置 DASHSCOPE_API_KEY 环境变量")
- print("Windows: set DASHSCOPE_API_KEY=sk-xxxxx")
- print("Linux: export DASHSCOPE_API_KEY=sk-xxxxx")
- exit(1)
- # Milvus 连接配置
- MILVUS_HOST = "http://192.168.10.160:19530"
- MILVUS_TOKEN = "root:Milvus"
- COLLECTION_NAME = "demo_collection"
- # 连接 Milvus
- client = MilvusClient(
- uri=MILVUS_HOST,
- token=MILVUS_TOKEN
- )
- # 加载嵌入模型(用于检索)
- embed_model = DashScopeEmbeddings(
- model="text-embedding-v2",
- dashscope_api_key=api_key
- )
- # 加载大语言模型(用于生成回答)
- llm = Tongyi(
- model="qwen-plus",
- dashscope_api_key=api_key,
- temperature=0.7
- )
- print("=" * 80)
- print("《民法典》智能问答系统")
- print("=" * 80)
- # ========== 1. 查看数据统计 ==========
- print("\n【数据库信息】")
- print("-" * 50)
- try:
- stats = client.get_collection_stats(collection_name=COLLECTION_NAME)
- total_count = stats.get('row_count', 0)
- print(f"✓ 法律条文数量: {total_count} 条")
- except Exception as e:
- print(f"无法获取统计信息: {e}")
- total_count = 0
- # 查看前3条数据预览
- if total_count > 0:
- print("\n【数据预览】")
- print("-" * 50)
- sample_data = client.query(
- collection_name=COLLECTION_NAME,
- output_fields=["id", "text"],
- limit=3
- )
- for i, data in enumerate(sample_data, 1):
- print(f"\n{i}. 第 {data['id']} 条")
- print(f" 内容: {data['text'][:150]}...")
- # ========== 2. 简单的检索函数 ==========
- def search_law(query, top_k=3):
- """
- 检索相关法律条文
- Args:
- query: 用户问题
- top_k: 返回结果数量
- Returns:
- 检索结果列表
- """
- # 计算查询向量
- query_vector = embed_model.embed_query(query)
- # 执行检索
- results = client.search(
- collection_name=COLLECTION_NAME,
- data=[query_vector],
- limit=top_k,
- output_fields=["text"],
- search_params={"metric_type": "L2", "params": {"nprobe": 10}}
- )
- # 格式化结果
- documents = []
- for hits in results:
- for hit in hits:
- documents.append({
- 'text': hit['entity']['text'],
- 'distance': hit['distance'],
- 'score': 1 - hit['distance'] # 转换为相似度分数
- })
- return documents
- # ========== 3. RAG 问答函数 ==========
- def ask_law(question, top_k=3):
- """
- 基于民法典回答问题
- Args:
- question: 用户问题
- top_k: 检索相关条文数量
- Returns:
- 生成的回答和检索到的相关条文
- """
- print(f"\n问题: {question}")
- print("-" * 50)
- # 1. 检索相关法律条文
- print("正在检索相关法律条文...")
- relevant_laws = search_law(question, top_k)
- if not relevant_laws:
- return "未找到相关法律条文,请尝试其他问题。"
- print(f"找到 {len(relevant_laws)} 条相关法律条文")
- # 2. 构建上下文
- context = "\n\n".join([
- f"【第{i + 1}条相关法律】\n{law['text']}"
- for i, law in enumerate(relevant_laws)
- ])
- # 3. 构建 prompt
- prompt = f"""你是一个法律助手,请根据《中华人民共和国民法典》的相关条文回答用户的问题。
- 相关法律条文:
- {context}
- 用户问题:{question}
- 请基于以上法律条文给出准确、清晰的回答。如果条文中有具体规定,请引用相关条款。
- 回答:"""
- # 4. 生成回答
- print("正在生成回答...")
- answer = llm.invoke(prompt)
- return answer, relevant_laws
- # ========== 4. 测试问答 ==========
- print("\n" + "=" * 80)
- print("【法律问答测试】")
- print("=" * 80)
- # 定义测试问题
- test_questions = [
- "结婚需要什么条件?",
- "什么是夫妻共同财产?",
- "离婚时孩子归谁抚养?",
- "遗嘱怎么立才有效?",
- "什么情况下可以要求精神损害赔偿?"
- ]
- # 选择问题
- print("\n请选择要咨询的法律问题:")
- for i, q in enumerate(test_questions, 1):
- print(f"{i}. {q}")
- print("0. 自定义问题")
- choice = input("\n请输入序号 (0-5): ").strip()
- if choice == "0":
- question = input("请输入您的法律问题: ").strip()
- test_questions = [question]
- elif choice.isdigit() and 1 <= int(choice) <= len(test_questions):
- test_questions = [test_questions[int(choice) - 1]]
- else:
- print("使用默认问题测试...")
- test_questions = test_questions[:1]
- # 执行问答
- for i, question in enumerate(test_questions, 1):
- print(f"\n{'=' * 80}")
- print(f"问题 {i}: {question}")
- print('=' * 80)
- result = ask_law(question, top_k=3)
- if isinstance(result, tuple):
- answer, laws = result
- print(f"\n【回答】\n{answer}")
- # 显示参考的法律条文
- print(f"\n【参考法律条文】")
- for j, law in enumerate(laws, 1):
- print(f"\n{j}. 相似度: {law['score']:.3f}")
- print(f" 内容: {law['text'][:200]}...")
- else:
- print(f"\n{result}")
- # ========== 5. 交互式问答模式 ==========
- print("\n" + "=" * 80)
- print("【交互式法律问答】")
- print("=" * 80)
- def interactive_qa():
- """交互式问答模式"""
- print("\n进入交互式问答模式(输入 'exit' 退出)")
- print("提示: 可以咨询任何民法典相关的问题")
- print("-" * 50)
- while True:
- question = input("\n您的法律问题: ").strip()
- if question.lower() in ['exit', 'quit', 'q']:
- print("退出问答系统")
- break
- if not question:
- continue
- result = ask_law(question, top_k=3)
- if isinstance(result, tuple):
- answer, laws = result
- print(f"\n【回答】\n{answer}")
- # 可选:显示参考条文
- show_laws = input("\n是否查看相关法律条文?(y/n): ").strip().lower()
- if show_laws == 'y':
- print(f"\n【相关法律条文】")
- for j, law in enumerate(laws, 1):
- print(f"\n{j}. 相似度: {law['score']:.3f}")
- print(f" {law['text'][:150]}...")
- else:
- print(f"\n{result}")
- # 询问是否启动交互式问答
- print("\n是否启动交互式问答模式?")
- choice = input("输入 'y' 启动,其他键退出: ").strip().lower()
- if choice == 'y':
- interactive_qa()
- # ========== 6. 简单示例:单次问答 ==========
- print("\n" + "=" * 80)
- print("【简单问答示例】")
- print("=" * 80)
- print("\n示例: 查询结婚条件")
- question = "结婚需要什么条件?"
- result = ask_law(question, top_k=2)
- if isinstance(result, tuple):
- answer, laws = result
- print(f"\n问题: {question}")
- print(f"\n回答: {answer}")
- print(f"\n参考法律条文:")
- for law in laws:
- print(f" - {law['text'][:100]}...")
- print("\n" + "=" * 80)
- print("系统运行完成!")
- print("=" * 80)
复制代码 五、Agents
5.1 Agent介绍
通用人工智能(AGI)将是 AI 的终极形态,几乎已成为业界共识。同样,构建 Agent则是 AI 工程应用当下的“终极形态”。
语言模型本身无法采取行动——它们只是输出文本。LangChain 的一个重要功能是创建Agent。Agent 是一种使用 LLM 作为推理引擎的系统,它决定要采取哪些行动以及这些行动的输入应该是什么。这些行动的结果可以反馈给 Agent,由 Agent 决定是否需要采取更多行动,或者是否可以完成。
与传统的固定流程链不同,Agent 具备一定的自主决策能力,更适合处理开放式、多步骤的问题。它可以拆解任务,根据任务动态决定调用哪些工具,并利用中间结果推进任务。
Agent 的核心能力/组件:
大模型(LLM):作为大脑,提供推理、规划和知识理解能力。记忆(Memory):具备短期记忆和长期记忆,支持快速知识检索。工具(Tools):调用外部工具(如API、数据库)的执行单元。规划(Planning):任务分解、反思与自省框架实现复杂任务处理。行动(Action):实际执行决策的能力。协作:通过与其他 Agent 交互合作,完成更复杂的任务目标。
5.2 Tools
5.2.1 Tools 介绍
要构建更强大的 AI 工程应用,只有生成文本这样的“纸上谈兵”能力自然是不够的,借助工具,才能让 AI 应用的能力真正具备无限的可能。
工具封装了一个可调用函数及其输入模式。这些参数可以传递给兼容的聊天模型,从而允许模型决定是否调用工具以及调用哪些参数。在这种情况下,工具调用使模型能够生成符合指定输入模式的请求。
5.2.2 创建工具
一个 Tool 通常包括工具名称,工具描述,以及工具参数的类型注解。
可以通过 @tool 装饰器来创建工具。
举例:通过 @tool 创建工具- from langchain.tools import tool
- @tool
- def add_number(a: int, b: int) -> int:
- """两个整数相加"""
- return a + b
- print(f"{add_number.name=}\n{add_number.description=}\n{add_number.args=}")
- # add_number.name='add_number'
- # add_number.description='两个整数相加'
- # add_number.args={'a': {'title': 'A', 'type': 'integer'}, 'b': {'title': 'B', 'type': 'integer'}}
复制代码 5.2.3 绑定工具
要想让大模型能够使用工具,首先需要将工具给到大模型。
创建模型实例,并通过 bind_tools 方法将工具绑定到大模型。
大模型通过分析用户需求,判断是否需要调用工具。如果需要则在响应的 additional_kwargs 参数中包含工具调用的详细信息。使用模型提供的参数执行工具。
示例:- import os
- from langchain.tools import tool
- from langchain.chat_models import init_chat_model
- @tool
- def query_user_info(user_id: int) -> str:
- """查询用户信息"""
- return {1001: "Jack", 1002: "Tom", 1003: "Alice"}[user_id]
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- # 为模型提供工具
- tools = [query_user_info]
- llm_with_tools = llm.bind_tools(tools)
- resp = llm_with_tools.invoke("帮我查下1001用户的信息")
- print(resp)
- # content='\n\n我来帮您查询1001用户的信息。\n'
- # additional_kwargs={'tool_calls': [{'id': '...', 'function': {'arguments': '{"user_id": 1001}', 'name': 'query_user_info'}
- # 返回的响应中 additional_kwargs 参数中包括了工具调用的信息,此时还没有调用工具,只是返回了要调用的工具及参数
- # 手动执行工具
- for tool_call in resp.tool_calls:
- tool_name = tool_call["name"] # 获取工具名称
- tool_args = tool_call["args"] # 获取工具参数
- tool_result = globals()[tool_name].invoke(tool_args) # 执行工具
- print(tool_name, tool_args, tool_result)
复制代码 5.3 构建 Agent
使用 create_agent 来创建 Agent,create_agent 使用 LangGraph 构建基于图的 Agent运行。此 Agent 会在一个循环中反复调用模型和工具,直到某次模型输出中不再包含工具调用则结束。
使用 create_agent 创建 Agent 时,需传入模型和工具、可选地也可以传入系统提示词。
这里使用 Tavily (搜索引擎)作为工具,需要先获取它的 API-Key 并添加到环境变量。- # pip install langchain-tavily
- import os
- from langchain_tavily import TavilySearch
- from langchain.agents import create_agent
- from langchain.chat_models import init_chat_model
- # 定义模型
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- # 从环境变量读取你原先设置的 api key
- tavily_key = os.getenv("tavily-api")
- print(f"读到 API Key: {tavily_key[:20]}...") # 调试输出
- # 定义 Tavily 搜索工具
- search = TavilySearch(tavily_api_key=tavily_key,max_results=5)
- tools = [search]
- # 创建 Agent
- agent = create_agent(
- model=llm, # 模型
- tools=tools, # 工具
- system_prompt="你是位助手,需要调用工具来帮助用户。", # 系统提示词
- )
- # 调用 Agent
- res = agent.invoke(
- {"messages": [{"role": "user", "content": "今天北京的天气怎么样?"}]}
- )
- print(res)
复制代码 如果 Agent 执行多个步骤,这可能需要一些时间。为了显示中间进度,我们可以使用 stream 流式返回消息。举例:- # pip install langchain-tavily
- import os
- from langchain_tavily import TavilySearch
- from langchain.agents import create_agent
- from langchain.chat_models import init_chat_model
- # 定义模型
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- # 从环境变量读取你原先设置的 api key
- tavily_key = os.getenv("tavily-api")
- print(f"读到 API Key: {tavily_key[:20]}...") # 调试输出
- # 定义 Tavily 搜索工具
- search = TavilySearch(tavily_api_key=tavily_key,max_results=5)
- tools = [search]
- # 创建 Agent
- agent = create_agent(model=llm, tools=tools)
- # 调用 Agent
- for chunk in agent.stream(
- {
- "messages": [
- {"role": "system", "content": "你是位助手,需要调用工具来帮助用户。"},
- {"role": "user", "content": "今天北京的天气怎么样?"},
- ]
- }
- ):
- print(chunk, end="\n\n")
复制代码
5.4 LangSmith
使用 LangChain 构建的许多应用程序都包含多个步骤,需要多次调用 LLM。随着这些应用程序变得越来越复杂,能够检查链或 Agent 内部的具体情况变得至关重要。最好的方法是使用 LangSmith。
注册 LangSmith,在 Settings ➡️ API Keys 下创建 API-Key 并复制。之后在环境变量中添加以开始记录跟踪:
LANGSMITH_TRACING="true"
LANGSMITH_API_KEY="..."
配置好环境变量之后,可在 LangSmith 的 Tracing Projects 中查看跟踪记录。
LangSmith 默认将跟踪记录到 default 项目,可通过 LANGSMITH_PROJECT 环境变量设置 LangSmith 跟踪记录保存到哪个项目,如果该项目不存在则会创建。
5.5 记忆
为了给代理添加短期记忆(线程级持久化),在创建代理时需要指定一个 checkpointer,并在调用代理时指定线程 ID。这个短期记忆的能力是借助 LangGraph 的状态和检查点实现的。
示例:- import os
- import datetime
- from langchain_tavily import TavilySearch
- from langchain.agents import create_agent
- from langchain.chat_models import init_chat_model
- from langgraph.checkpoint.memory import InMemorySaver
- os.environ["LANGSMITH_PROJECT"] = "agent-with-memory"
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- # 从环境变量读取你原先设置的 api key
- tavily_key = os.getenv("tavily-api")
- print(f"读到 API Key: {tavily_key[:20]}...") # 调试输出
- # 定义 Tavily 搜索工具
- search = TavilySearch(tavily_api_key=tavily_key,max_results=5)
- tools = [search]
- # 创建 Agent
- agent = create_agent(model=llm, tools=tools,checkpointer=InMemorySaver())
- # 调用
- for chunk in agent.stream(
- input={
- "messages": [
- {
- "role": "system",
- "content": f"当前时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- },
- {"role": "user", "content": "今天北京天气怎么样?"},
- ]
- },
- config={"configurable": {"thread_id": "1"}},
- ):
- print(chunk, end="\n\n")
- for chunk in agent.stream(
- input={
- "messages": [
- {
- "role": "system",
- "content": f"当前时间:{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- },
- {"role": "user", "content": "上海呢?"},
- ]
- },
- config={"configurable": {"thread_id": "1"}},
- ):
- print(chunk, end="\n\n")
复制代码 5.6 MCP
5.6.1 MCP介绍
Model Context Protocol(MCP,模型上下文协议)是一个开源协议,它标准化了大语言模型与外部工具和数据源通信的方式,允许开发者和工具提供商只需集成一次,就能与任何兼容 MCP 的系统交互。MCP 就像 USB-C 标准:不需要为每个设备使用不同的连接器,而是使用一个端口来处理多种类型的连接。
5.6.2 MCP架构
MCP 遵循客户端-服务器架构,架构中包括:
MCP 主机
| 协调和管理一个或多个 MCP 客户端的 AI 应用
| MCP 客户端
| 一个保持与 MCP 服务器连接的组件,通过 MCP 定义的消息处理通信,从服务器查找并请求资源和工具,并管理与服务器的连接生命周期
| MCP 服务器
| 一个向 MCP 客户端提供服务的程序,通过协议暴露工具、资源和提示模板功能
|
5.6.3 MCP层级
MCP 分为两个层级:
数据层实现了一个基于 JSON-RPC 2.0 的交换协议,该协议定义了消息结构和语义。
数据层包括生命周期管理(连接初始化、能力协商、连接终止)、服务器功能(提供工具、资源和提示模板)、客户端功能(调用LLM、获取输入、记录消息)、其他功能(实时更新通知、长时运行操作跟踪)。
传输层定义了客户端与服务器之间数据交换的通信机制和通道,包括特定传输方式的连接建立、消息帧定界和授权。
MCP 支持多种传输机制,包括 Stdio、Streamable HTTP、SSE。
Stdio
| 使用标准输入和输出流,与在终端输入命令并看到响应时使用的机制相同。适用于本地开发
| Streamable HTTP
| 该传输使用 HTTP POST 和 GET 请求,服务器可以选择使用SSE来流式传输多个服务器消息。支持流式传输和服务器到客户端通知,并支持标准 HTTP 身份验证方法,包括授权令牌、API 密钥和自定义头信息
| SSE
| 带有 SSE(Server-Sent Events 服务器发送事件)的 HTTP,MCP早期传输机制,现逐渐被 Streamable HTTP 取代
| 5.6.4 MCP工作流程
在初始化过程中,AI 应用程序的 MCP 客户端管理器连接到配置的服务器,并将它们的能力存储起来以供后续使用。应用程序使用这些信息来确定哪些服务器可以提供特定类型的功能(工具、资源、提示),以及它们是否支持实时更新。
初始化有几个重要的作用:
协议版本协商
| 确保客户端和服务器使用兼容的协议版本,避免因版本不一致导致的通信问题
| 能力发现
| 声明各自支持的功能,包括他们能够处理的基元类型(工具、资源、提示)以及是否支持通知等特性
| 身份交换
| 交换客户端与服务器的身份及版本信息,便于后续的调试与兼容性管理
|
AI 应用程序从所有连接的 MCP 服务器中获取可用工具,并将它们组合成一个语言模型可以访问的统一工具注册表。这使得 LLM 能够理解它可以执行哪些操作,并在对话期间自动生成相应的工具调用。
连接建立之后,客户端可以通过发送 tools/list 请求来发现可用的工具。这个请求是 MCP 工具发现机制的基础—它允许客户端在尝试使用工具之前了解服务器上有哪些可用的工具。响应包含一个 tools 数组,该数组提供了关于每个可用工具的全面元数据。这种基于数组的结构允许服务器同时展示多个工具,同时保持不同功能之间的清晰界限。响应中的每个工具包括几个关键字段:
name
| 工具标识符
| title
| 工具的易读显示名称
| description
| 工具描述
| inputSchema
| 一个定义预期输入参数的 JSON Schema,支持类型验证并提供关于必需和可选参数的清晰文档
|
当语言模型在对话中决定使用工具时,AI 应用程序会拦截工具调用,将其路由到合适的 MCP 服务器,执行该工具,并将结果作为对话流程的一部分返回给 LLM。这使 LLM 能够访问实时数据并在外部世界中执行操作。
客户端使用 tools/call 方法执行一个工具。tools/call 请求遵循结构化格式,确保客户端和服务器之间的类型安全和清晰通信。请求结构包括几个重要组件:
name
| 工具标识符
| arguments
| 包含工具的 inputSchema 定义的输入参数
| 响应返回一个内容对象数组,允许进行丰富、多格式的响应(文本、图片、资源等)。每个内容对象都有一个 type 字段。
MCP 支持实时通知,使服务器能够在未经明确请求的情况下通知客户端有关变更。当 AI 应用程序收到关于工具变更的通知时,它会立即刷新其工具注册表并更新 LLM 的可用功能。这确保了正在进行的对话始终能够访问最新的一组工具,并且 LLM 可以随着新功能的可用而动态适应。
5.6.5 MCP SDK
可通过 mcp 包来简单创建 Stdio 服务器。
服务端 mcp_server_stdio.py:- import mcp
- # pip add mcp
- from mcp.server.fastmcp import FastMCP
- # 创建 MCP 实例
- mcp = FastMCP("Demo")
- # 为 MCP 实例添加工具
- @mcp.tool()
- def add(a: int, b: int) -> int:
- return a + b
- # 为 MCP 实例添加资源
- @mcp.resource("greeting://default")
- def get_greeting() -> str:
- return "Hello from static resource!"
- # 为 MCP 实例添加提示词
- @mcp.prompt()
- def greet_user(name: str, style: str = "friendly") -> str:
- styles = {
- "friendly": "写一句友善的问候",
- "formal": "写一句正式的问候",
- "casual": "写一句轻松的问候",
- }
- return f"为{name}{styles.get(style, styles['friendly'])}"
- if __name__ == "__main__":
- mcp.run(transport="stdio")
复制代码 客户端:- # pip install mcp
- import asyncio
- from mcp.client.stdio import stdio_client
- from mcp import ClientSession, StdioServerParameters
- async def stdio_run():
- server_params = StdioServerParameters(
- command="python",
- args=["McpServer.py"],
- )
- async with stdio_client(server_params) as (read, write):
- async with ClientSession(read, write) as session:
- # 初始化连接
- await session.initialize()
- # 获取可用工具
- tools = await session.list_tools()
- print(tools)
- print()
- # 调用工具
- call_res = await session.call_tool("add", {"a": 1, "b": 2})
- print(call_res)
- print()
- # 获取可用资源
- resources = await session.list_resources()
- print(resources)
- print()
- # 调用资源
- read_res = await session.read_resource("greeting://default")
- print(read_res)
- print()
- # 获取可用提示
- prompts = await session.list_prompts()
- print(prompts)
- print()
- # 调用提示
- get_res = await session.get_prompt("greet_user", {"name": "Jack"})
- print(get_res)
- print()
- asyncio.run(stdio_run())
复制代码 服务端:- # pip add mcp
- from mcp.server.fastmcp import FastMCP
- # 创建 MCP 实例
- mcp = FastMCP("Demo")
- # 为 MCP 实例添加工具
- @mcp.tool()
- def add(a: int, b: int) -> int:
- return a + b
- # 为 MCP 实例添加资源
- @mcp.resource("greeting://default")
- def get_greeting() -> str:
- return "Hello from static resource!"
- # 为 MCP 实例添加提示词
- @mcp.prompt()
- def greet_user(name: str, style: str = "friendly") -> str:
- styles = {
- "friendly": "写一句友善的问候",
- "formal": "写一句正式的问候",
- "casual": "写一句轻松的问候",
- }
- return f"为{name}{styles.get(style, styles['friendly'])}"
- if __name__ == "__main__":
- # mcp.settings.host = "0.0.0.0"
- # mcp.settings.port = 8888
- mcp.run(transport="streamable-http") # 默认启动在 127.0.0.1:8000
复制代码 客户端:- # pip install mcp
- import asyncio
- from mcp import ClientSession
- from mcp.client.streamable_http import streamablehttp_client
- async def streamablehttp_run():
- url = "http://127.0.0.1:8000/mcp"
- headers = {"Authorization": "Bearer sk-atguigu"}
- async with streamablehttp_client(url, headers) as (read, write, _):
- async with ClientSession(read, write) as session:
- # 初始化连接
- await session.initialize()
- # 获取可用工具
- tools = await session.list_tools()
- print(tools)
- print()
- # 调用工具
- call_res = await session.call_tool("add", {"a": 1, "b": 2})
- print(call_res)
- print()
- # 获取可用资源
- resources = await session.list_resources()
- print(resources)
- print()
- # 调用资源
- read_res = await session.read_resource("greeting://default")
- print(read_res)
- print()
- # 获取可用提示
- prompts = await session.list_prompts()
- print(prompts)
- print()
- # 调用提示
- get_res = await session.get_prompt("greet_user", {"name": "Jack"})
- print(get_res)
- print()
- asyncio.run(streamablehttp_run())
复制代码 5.6.6 LangChain 使用 MCP
LangChain Agent 可以通过 langchain-mcp-adapters 包使用 MCP 服务器上定义的工具。
这里使用了如下工具,需要先在相关平台创建 API-Key 并添加到环境变量:
阿里云百炼:https://bailian.console.aliyun.com/?tab=mcp#/mcp-market/detail/WebSearch
Smithery:https://smithery.ai/server/@DeniseLewis200081/rail
示例:- # pip install langchain_mcp_adapters
- import os
- import asyncio
- from urllib.parse import urlencode
- from langchain.agents import create_agent
- from langchain.chat_models import init_chat_model
- from langchain_mcp_adapters.client import MultiServerMCPClient
- # 配置 MCP 客户端
- mcp_client = MultiServerMCPClient(
- {
- "WebSearch": {
- "transport": "streamable_http",
- "url": "https://dashscope.aliyuncs.com/api/v1/mcps/WebSearch/mcp",
- "headers": {"Authorization": f"Bearer {os.getenv('aliQwen-api')}"},
- }, # https://bailian.console.aliyun.com/?tab=mcp#/mcp-market/detail/WebSearch
- "RailService": {
- "transport": "streamable_http",
- "url": f"{'https://server.smithery.ai/@DeniseLewis200081/rail/mcp'}?{urlencode({'api_key': os.getenv('smith-api-key')})}",
- }, # https://smithery.ai/server/@DeniseLewis200081/rail
- }
- )
- # 获取工具
- tools = asyncio.run(mcp_client.get_tools())
- # 定义模型
- llm = init_chat_model(
- model="qwen-plus",
- model_provider="openai",
- base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
- api_key=os.getenv("aliQwen-api"),
- )
- # 创建 Agent
- agent = create_agent(model=llm, tools=tools)
- # 调用 Agent
- async def main():
- async for chunk in agent.astream(
- {
- "messages": [
- {"role": "system", "content": "你是位助手,需要调用工具来帮助用户。"},
- {
- "role": "user",
- "content": "北京今天天气怎么样,要是还不错的话,帮我看看今天上海到北京的车票",
- },
- ]
- }
- ):
- print(chunk, end="\n\n")
- asyncio.run(main())
复制代码 原文地址:https://blog.csdn.net/weixin_39764056/article/details/159514943 |