开启左侧

基于LangChain+LLM的本地知识库问答:从企业单文档问答到批...

[复制链接]
admin 发表于 10 小时前 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
前言

过去半年,随着ChatGPT的火爆,直接带火了整个LLM这个方向,然LLM毕竟更多是基于过去的经验数据预训练而来,没法获取最新的知识,以及各企业私有的知识

    为了获取最新的知识,ChatGPT plus版集成了bing搜索的功能,有的模型则会调用一个定位于 “链接各种AI模型、工具”的langchain的bing功能为了处理企业私有的知识,要么基于开源模型微调,要么更可以基于langchain里集成的向量数据库和LLM搭建本地知识库问答(此处的量数据库的独特性在哪呢?举个例子,传统数据库做图片检索可能是通过关键词去搜索,向量数据库是通过语义搜索图片中相同或相近的向量并呈现结果)
所以越来越多的人开始关注langchain并把它与LLM结合起来应用,更直接推动了数据库、知识图谱与LLM的结合应用(详见下一篇文章:知识图谱实战导论:从什么是KG到LLM与KG/DB的结合实战)
本文则侧重讲解

    什么是LangChain及langchain的整体组成架构通过langchain-ChatGLM构建本地知识库问答的基本流程,与每个流程背后的逻辑解读langchain-ChatGLM项目的关键源码,不只是把它当做一个工具使用,因为对工具的原理更了解,则对工具的使用更顺畅
    一开始解读不易,因为涉及的项目、技术点不少,所以一开始容易绕晕,好在根据该项目的流程一步步抽丝剥茧之后,给大家呈现了清晰的代码架构
    过程中,我从接触该langchain-ChatGLM项目到整体源码梳理清晰并写清楚历时了近一周,而大家有了本文之后,可能不到一天便可以理清了(提升近7倍效率) ​​​,这便是本文的价值和意义之一langchain-ChatGLM项目的升级版:langchain-Chatchat我司基于langchain-chatchat二次开发的企业多文档知识库问答系统
阅读过程中若有任何问题,欢迎随时留言,会一一及时回复/解答,共同探讨、共同深挖
第一部分 LangChain的整体组成架构:LLM的外挂/功能库

通俗讲,所谓langchain (官网地址GitHub地址),即把AI中常用的很多功能都封装成库,且有调用各种商用模型API、开源模型的接口,支持以下各种组件


​初次接触的朋友一看这么多组件可能直接晕了(封装的东西非常多,感觉它想把LLM所需要用到的功能/工具都封装起来),为方便理解,我们可以先从大的层面把整个langchain库划分为三个大层:基础层、能力层、应用层

1.1 基础层:models、LLMs、index

1.1.1 Models:模型

各种类型的模型和模型集成,比如OpenAI的各个API/GPT-4等等,为各种不同基础模型提供统一接口
比如通过API完成一次问答
  1. import os
  2. os.environ["OPENAI_API_KEY"] = '你的api key'
  3. from langchain.llms import OpenAI

  4. llm = OpenAI(model_name="text-davinci-003",max_tokens=1024)
  5. llm("怎么评价人工智能")
复制代码

得到的回答如下图所示



1.1.2 LLMS层

这一层主要强调对models层能力的封装以及服务化输出能力,主要有:

    各类LLM模型管理平台:强调的模型的种类丰富度以及易用性一体化服务能力产品:强调开箱即用差异化能力:比如聚焦于Prompt管理(包括提示管理、提示优化和提示序列化)、基于共享资源的模型运行模式等等
比如Google's PaLM Text APIs,再比如 llms/openai.py 文件下
  1.         model_token_mapping = {
  2.             "gpt-4": 8192,
  3.             "gpt-4-0314": 8192,
  4.             "gpt-4-0613": 8192,
  5.             "gpt-4-32k": 32768,
  6.             "gpt-4-32k-0314": 32768,
  7.             "gpt-4-32k-0613": 32768,
  8.             "gpt-3.5-turbo": 4096,
  9.             "gpt-3.5-turbo-0301": 4096,
  10.             "gpt-3.5-turbo-0613": 4096,
  11.             "gpt-3.5-turbo-16k": 16385,
  12.             "gpt-3.5-turbo-16k-0613": 16385,
  13.             "text-ada-001": 2049,
  14.             "ada": 2049,
  15.             "text-babbage-001": 2040,
  16.             "babbage": 2049,
  17.             "text-curie-001": 2049,
  18.             "curie": 2049,
  19.             "davinci": 2049,
  20.             "text-davinci-003": 4097,
  21.             "text-davinci-002": 4097,
  22.             "code-davinci-002": 8001,
  23.             "code-davinci-001": 8001,
  24.             "code-cushman-002": 2048,
  25.             "code-cushman-001": 2048,
  26.         }
复制代码

1.1.3 Index(索引):Vector方案、KG方案

对用户私域文本、图片、PDF等各类文档进行存储和检索(相当于结构化文档,以便让外部数据和模型交互),具体实现上有两个方案:一个Vector方案、一个KG方案
1.1.3.1 Index(索引)之Vector方案

对于Vector方案:即对文件先切分为Chunks,在按Chunks分别编码存储并检索,可参考此代码文件:langchain/libs/langchain/langchain/indexes /vectorstore.py
该代码文件依次实现
模块导入:导入了各种类型检查、数据结构、预定义类和函数
接下来,实现了一个函数_get_default_text_splitter,两个类VectorStoreIndexWrapper、VectorstoreIndexCreator
_get_default_text_splitter 函数:
这是一个私有函数,返回一个默认的文本分割器,它可以将文本递归地分割成大小为1000的块,且块与块之间有重叠
  1. # 默认的文本分割器函数
  2. def _get_default_text_splitter() -> TextSplitter:
  3.     return RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
复制代码

为什么要进行切割?

原因很简单, embedding(text2vec,文本转化为向量)以及 LLM encoder 对输入 tokens 都有限制。embedding 会将一个 text(长字符串)的语义信息压缩成一个向量,但其对 text 包含的 tokens 是有限制的,一段话压缩成一个向量是 ok,但一本书压缩成一个向量可能就丢失了绝大多数语义
接下来是,VectorStoreIndexWrapper 类:
这是一个包装类,主要是为了方便地访问和查询向量存储(Vector Store)
  • vectorstore: 一个向量存储对象的属性  
    1.     vectorstore: VectorStore  # 向量存储对象

    2.     class Config:
    3.         """Configuration for this pydantic object."""

    4.         extra = Extra.forbid            # 额外配置项
    5.         arbitrary_types_allowed = True  # 允许任意类型
    复制代码

  • query: 一个方法,它接受一个问题字符串并查询向量存储来获取答案  
    1. # 查询向量存储的函数
    2. def query(
    3.     self,
    4.     question: str,                                          # 输入的问题字符串
    5.     llm: Optional[BaseLanguageModel] = None,                # 可选的语言模型参数,默认为None
    6.     retriever_kwargs: Optional[Dict[str, Any]] = None,      # 提取器的可选参数,默认为None
    7.     **kwargs: Any                                           # 其他关键字参数
    8. ) -> str:
    9.     """Query the vectorstore."""                            # 函数的文档字符串,描述函数的功能

    10.     # 如果没有提供语言模型参数,则使用OpenAI作为默认语言模型,并设定温度参数为0
    11.     llm = llm or OpenAI(temperature=0)                     

    12.     # 如果没有提供提取器的参数,则初始化为空字典
    13.     retriever_kwargs = retriever_kwargs or {}               

    14.     # 创建一个基于语言模型和向量存储提取器的检索QA链
    15.     chain = RetrievalQA.from_chain_type(
    16.         llm, retriever=self.vectorstore.as_retriever(**retriever_kwargs), **kwargs
    17.     )

    18.     # 使用创建的QA链运行提供的问题,并返回结果
    19.     return chain.run(question)
    复制代码

    解释一下上面出现的提取器 提取器首先从大型语料库中检索与问题相关的文档或片段,然后生成器根据这些检索到的文档生成答案。
    提取器可以基于许多不同的技术,包括:
        a.基于关键字的检索:使用关键字匹配来查找相关文档
         b.向量空间模型:将文档和查询都表示为向量,并通过计算它们之间的相似度来检索相关文档
         c.基于深度学习的方法:使用预训练的神经网络模型(如BERT、RoBERTa等)将文档和查询编码为向量,并进行相似度计算
         d.索引方法:例如倒排索引,这是搜索引擎常用的技术,可以快速找到包含特定词或短语的文档
    这些方法可以独立使用,也可以结合使用,以提高检索的准确性和速度
  • query_with_sources: 类似于query,但它还返回与查询结果相关的数据源  
    1.     # 查询向量存储并返回数据源的函数
    2.     def query_with_sources(
    3.         self,
    4.         question: str,
    5.         llm: Optional[BaseLanguageModel] = None,
    6.         retriever_kwargs: Optional[Dict[str, Any]] = None,
    7.         **kwargs: Any
    8.     ) -> dict:
    9.         """Query the vectorstore and get back sources."""
    10.         llm = llm or OpenAI(temperature=0)              # 默认使用OpenAI作为语言模型
    11.         retriever_kwargs = retriever_kwargs or {}       # 提取器参数
    12.         chain = RetrievalQAWithSourcesChain.from_chain_type(
    13.             llm, retriever=self.vectorstore.as_retriever(**retriever_kwargs), **kwargs
    14.         )
    15.         return chain({chain.question_key: question})
    复制代码

最后是VectorstoreIndexCreator 类:
这是一个创建向量存储索引的类
  • vectorstore_cls: 使用的向量存储类,默认为Chroma  
    1.     vectorstore_cls: Type[VectorStore] = Chroma              # 默认使用Chroma作为向量存储类
    复制代码

    一个简化的向量存储可以看作是一个大型的表格或数据库,其中每行代表一个项目(如文档、图像、句子等),而每个项目则有一个与之关联的高维向量。向量的维度可以从几十到几千,取决于所使用的嵌入模型
    例如:   
    Item IDVector (in a high dimensional space)
    1[0.34, -0.2, 0.5, ...]
    2[-0.1, 0.3, -0.4, ...]
    ......
    至于这里的Chroma是一种常见的向量数据库,可以通过与LangChain的集成,实现基于语言模型的各种应用   
  • embedding: 使用的嵌入类,默认为OpenAIEmbeddings  
    1.     embedding: Embeddings = Field(default_factory=OpenAIEmbeddings)  # 默认使用OpenAIEmbeddings作为嵌入类
    复制代码

    顺带说一下,Huggingface 有一个 embedding 的 benchmark:https://huggingface.co/spaces/mteb/leaderboard
  • text_splitter: 用于分割文本的文本分割器  
    1.     text_splitter: TextSplitter = Field(default_factory=_get_default_text_splitter)  # 默认文本分割器
    复制代码

  • from_loaders: 从给定的加载器列表中创建一个向量存储索引  
    1.     # 从加载器创建向量存储索引的函数
    2.     def from_loaders(self, loaders: List[BaseLoader]) -> VectorStoreIndexWrapper:
    3.         """Create a vectorstore index from loaders."""
    4.         docs = []
    5.         for loader in loaders:              # 遍历加载器
    6.             docs.extend(loader.load())      # 加载文档
    7.         return self.from_documents(docs)
    复制代码

  • from_documents: 从给定的文档列表中创建一个向量存储索引  
    1.     # 从文档创建向量存储索引的函数
    2.     def from_documents(self, documents: List[Document]) -> VectorStoreIndexWrapper:
    3.         """Create a vectorstore index from documents."""
    4.         sub_docs = self.text_splitter.split_documents(documents)      # 分割文档
    5.         vectorstore = self.vectorstore_cls.from_documents(
    6.             sub_docs, self.embedding, **self.vectorstore_kwargs       # 从文档创建向量存储
    7.         )
    8.         return VectorStoreIndexWrapper(vectorstore=vectorstore)       # 返回向量存储的包装对象
    复制代码

1.1.3.2 Index(索引)之KG方案

对于KG方案:这部分利用LLM抽取文件中的三元组,将其存储为KG供后续检索,可参考此代码文件:langchain/libs/langchain/langchain/indexes /graph.py
  1. """Graph Index Creator."""                     # 定义"图索引创建器"的描述

  2. # 导入相关的模块和类型定义
  3. from typing import Optional, Type              # 导入可选类型和类型的基础类型
  4. from langchain import BasePromptTemplate       # 导入基础提示模板
  5. from langchain.chains.llm import LLMChain      # 导入LLM链
  6. from langchain.graphs.networkx_graph import NetworkxEntityGraph, parse_triples  # 导入Networkx实体图和解析三元组的功能
  7. from langchain.indexes.prompts.knowledge_triplet_extraction import (  # 从知识三元组提取模块导入对应的提示
  8.     KNOWLEDGE_TRIPLE_EXTRACTION_PROMPT,
  9. )
  10. from langchain.pydantic_v1 import BaseModel                      # 导入基础模型
  11. from langchain.schema.language_model import BaseLanguageModel    # 导入基础语言模型的定义

  12. class GraphIndexCreator(BaseModel):  # 定义图索引创建器类,继承自BaseModel
  13.     """Functionality to create graph index."""   # 描述该类的功能为"创建图索引"

  14.     llm: Optional[BaseLanguageModel] = None      # 定义可选的语言模型属性,默认为None
  15.     graph_type: Type[NetworkxEntityGraph] = NetworkxEntityGraph  # 定义图的类型,默认为NetworkxEntityGraph

  16.     def from_text(
  17.         self, text: str, prompt: BasePromptTemplate = KNOWLEDGE_TRIPLE_EXTRACTION_PROMPT
  18.     ) -> NetworkxEntityGraph:                  # 定义一个方法,从文本中创建图索引
  19.         """Create graph index from text."""    # 描述该方法的功能
  20.         if self.llm is None:                   # 如果语言模型为None,则抛出异常
  21.             raise ValueError("llm should not be None")
  22.         graph = self.graph_type()  # 创建一个新的图
  23.         chain = LLMChain(llm=self.llm, prompt=prompt)  # 使用当前的语言模型和提示创建一个LLM链
  24.         output = chain.predict(text=text)      # 使用LLM链对文本进行预测
  25.         knowledge = parse_triples(output)      # 解析预测输出得到的三元组
  26.         for triple in knowledge:               # 遍历所有的三元组
  27.             graph.add_triple(triple)           # 将三元组添加到图中
  28.         return graph  # 返回创建的图

  29.     async def afrom_text(             # 定义一个异步版本的from_text方法
  30.         self, text: str, prompt: BasePromptTemplate = KNOWLEDGE_TRIPLE_EXTRACTION_PROMPT
  31.     ) -> NetworkxEntityGraph:
  32.         """Create graph index from text asynchronously."""  # 描述该异步方法的功能
  33.         if self.llm is None:          # 如果语言模型为None,则抛出异常
  34.             raise ValueError("llm should not be None")
  35.         graph = self.graph_type()     # 创建一个新的图
  36.         chain = LLMChain(llm=self.llm, prompt=prompt)       # 使用当前的语言模型和提示创建一个LLM链
  37.         output = await chain.apredict(text=text)   # 异步使用LLM链对文本进行预测
  38.         knowledge = parse_triples(output)          # 解析预测输出得到的三元组
  39.         for triple in knowledge:                   # 遍历所有的三元组
  40.             graph.add_triple(triple)               # 将三元组添加到图中
  41.         return graph                               # 返回创建的图
复制代码

另外,为了索引,便不得不牵涉以下这些能力

    Document Loaders,文档加载的标准接口
    与各种格式的文档及数据源集成,比如Arxiv、Email、Excel、Markdown、PDF(所以可以做类似ChatPDF这样的应用)、Youtube …

    相近的还有
    docstore,其中包含wikipedia.py等
    document_transformersembeddings​(langchain/libs/langchain/langchain/embeddings),则涉及到各种embeddings算法,分别体现在各种代码文件中:
    elasticsearch.py、google_palm.py、gpt4all.py、huggingface.py、huggingface_hub.py
    llamacpp.py、minimax.py、modelscope_hub.py、mosaicml.py
    openai.py
    sentence_transformer.py、spacy_embeddings.py、tensorflow_hub.py、vertexai.py

1.2 能力层:Chains、Memory、Tools

如果基础层提供了最核心的能力,能力层则给这些能力安装上手、脚、脑,让其具有记忆和触发万物的能力,包括:Chains、Memory、Tool三部分
1.2.1 Chains:链接

简言之,相当于包括一系列对各种组件的调用,可能是一个 Prompt 模板,一个语言模型,一个输出解析器,一起工作处理用户的输入,生成响应,并处理输出
具体而言,则相当于按照不同的需求抽象并定制化不同的执行逻辑,Chain可以相互嵌套并串行执行,通过这一层,让LLM的能力链接到各行各业

    比如与Elasticsearch数据库交互的:elasticsearch_database
  • 比如基于知识图谱问答的:graph_qa 其中的代码文件:chains/graph_qa/base.py 便实现了一个基于知识图谱实现的问答系统,具体步骤为
    首先,根据提取到的实体在知识图谱中查找相关的信息「这是通过 self.graph.get_entity_knowledge(entity) 实现的,它返回的是与实体相关的所有信息,形式为三元组」
    然后,将所有的三元组组合起来,形成上下文
    最后,将问题和上下文一起输入到qa_chain,得到最后的答案
    1.    entities = get_entities(entity_string)  # 获取实体列表。
    2.         context = ""               # 初始化上下文。
    3.         all_triplets = []          # 初始化三元组列表。
    4.         for entity in entities:    # 遍历每个实体
    5.             all_triplets.extend(self.graph.get_entity_knowledge(entity))  # 获取实体的所有知识并加入到三元组列表中。
    6.         context = "\n".join(all_triplets)          # 用换行符连接所有的三元组作为上下文。
    7.         
    8.         # 打印完整的上下文。
    9.         _run_manager.on_text("Full Context:", end="\n", verbose=self.verbose)
    10.         _run_manager.on_text(context, color="green", end="\n", verbose=self.verbose)
    11.         
    12.         # 使用上下文和问题获取答案。
    13.         result = self.qa_chain(
    14.             {"question": question, "context": context},
    15.             callbacks=_run_manager.get_child(),
    16.         )
    17.         return {self.output_key: result[self.qa_chain.output_key]}  # 返回答案
    复制代码

    比如能自动生成代码并执行的:llm_math等等
  • 比如面向私域数据的:qa_with_sources,其中的这份代码文件 chains/qa_with_sources/vector_db.py则是使用向量数据库的问题回答,核心在于以下两个函数
    reduce_tokens_below_limit  
    1. # 定义基于向量数据库的问题回答类
    2. class VectorDBQAWithSourcesChain(BaseQAWithSourcesChain):
    3.     """Question-answering with sources over a vector database."""
    4.    
    5.     # 定义向量数据库的字段
    6.     vectorstore: VectorStore = Field(exclude=True)

    7.     """Vector Database to connect to."""
    8.     # 定义返回结果的数量
    9.     k: int = 4

    10.     # 是否基于token限制来减少返回结果的数量
    11.     reduce_k_below_max_tokens: bool = False

    12.     # 定义返回的文档基于token的最大限制
    13.     max_tokens_limit: int = 3375

    14.     # 定义额外的搜索参数
    15.     search_kwargs: Dict[str, Any] = Field(default_factory=dict)

    16.     # 定义函数来根据最大token限制来减少文档
    17.     def _reduce_tokens_below_limit(self, docs: List[Document]) -> List[Document]:
    18.         num_docs = len(docs)

    19.         # 检查是否需要根据token减少文档数量
    20.         if self.reduce_k_below_max_tokens and isinstance(
    21.             self.combine_documents_chain, StuffDocumentsChain
    22.         ):
    23.             tokens = [
    24.                 self.combine_documents_chain.llm_chain.llm.get_num_tokens(
    25.                     doc.page_content
    26.                 )
    27.                 for doc in docs
    28.             ]
    29.             token_count = sum(tokens[:num_docs])

    30.             # 减少文档数量直到满足token限制
    31.             while token_count > self.max_tokens_limit:
    32.                 num_docs -= 1
    33.                 token_count -= tokens[num_docs]

    34.         return docs[:num_docs]
    复制代码

      _get_docs  
    1.     # 获取相关文档的函数
    2.     def _get_docs(
    3.         self, inputs: Dict[str, Any], *, run_manager: CallbackManagerForChainRun
    4.     ) -> List[Document]:
    5.         question = inputs[self.question_key]

    6.         # 从向量存储中搜索相似的文档
    7.         docs = self.vectorstore.similarity_search(
    8.             question, k=self.k, **self.search_kwargs
    9.         )
    10.         return self._reduce_tokens_below_limit(docs)
    复制代码

    比如面向SQL数据源的:sql_database,可以重点关注这份代码文件:chains/sql_database/query.py比如面向模型对话的:chat_models,包括这些代码文件:__init__.py、anthropic.py、azure_openai.py、base.py、fake.py、google_palm.py、human.py、jinachat.py、openai.py、promptlayer_openai.py、vertexai.py
另外,还有比较让人眼前一亮的:
constitutional_ai:对最终结果进行偏见、合规问题处理的逻辑,保证最终的结果符合价值观
llm_checker:能让LLM自动检测自己的输出是否有没有问题的逻辑
1.2.2 Memory:记忆

简言之,用来保存和模型交互时的上下文状态,处理长期记忆

具体而言,这层主要有两个核心点:


  对Chains的执行过程中的输入、输出进行记忆并结构化存储,为下一步的交互提供上下文,这部分简单存储在Redis即可


  根据交互历史构建知识图谱,根据关联信息给出准确结果,对应的代码文件为:memory/kg.py
  1. # 定义知识图谱对话记忆类
  2. class ConversationKGMemory(BaseChatMemory):
  3.     """知识图谱对话记忆类

  4.     在对话中与外部知识图谱集成,存储和检索对话中的知识三元组信息。
  5.     """

  6.     k: int = 2  # 考虑的上下文对话数量
  7.     human_prefix: str = "Human"  # 人类前缀
  8.     ai_prefix: str = "AI"  # AI前缀
  9.     kg: NetworkxEntityGraph = Field(default_factory=NetworkxEntityGraph)  # 知识图谱实例
  10.     knowledge_extraction_prompt: BasePromptTemplate = KNOWLEDGE_TRIPLE_EXTRACTION_PROMPT          # 知识提取提示
  11.     entity_extraction_prompt: BasePromptTemplate = ENTITY_EXTRACTION_PROMPT  # 实体提取提示
  12.     llm: BaseLanguageModel                  # 基础语言模型
  13.     summary_message_cls: Type[BaseMessage] = SystemMessage  # 总结消息类
  14.     memory_key: str = "history"             # 历史记忆键

  15.     def load_memory_variables(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
  16.         """返回历史缓冲区。"""
  17.         entities = self._get_current_entities(inputs)  # 获取当前实体

  18.         summary_strings = []
  19.         for entity in entities:  # 对于每个实体
  20.             knowledge = self.kg.get_entity_knowledge(entity)      # 获取与实体相关的知识
  21.             if knowledge:
  22.                 summary = f"On {entity}: {'. '.join(knowledge)}."  # 构建总结字符串
  23.                 summary_strings.append(summary)
  24.         context: Union[str, List]
  25.         if not summary_strings:
  26.             context = [] if self.return_messages else ""
  27.         elif self.return_messages:
  28.             context = [
  29.                 self.summary_message_cls(content=text) for text in summary_strings
  30.             ]
  31.         else:
  32.             context = "\n".join(summary_strings)

  33.         return {self.memory_key: context}

  34.     @property
  35.     def memory_variables(self) -> List[str]:
  36.         """始终返回记忆变量列表。"""
  37.         return [self.memory_key]

  38.     def _get_prompt_input_key(self, inputs: Dict[str, Any]) -> str:
  39.         """获取提示的输入键。"""
  40.         if self.input_key is None:
  41.             return get_prompt_input_key(inputs, self.memory_variables)
  42.         return self.input_key

  43.     def _get_prompt_output_key(self, outputs: Dict[str, Any]) -> str:
  44.         """获取提示的输出键。"""
  45.         if self.output_key is None:
  46.             if len(outputs) != 1:
  47.                 raise ValueError(f"One output key expected, got {outputs.keys()}")
  48.             return list(outputs.keys())[0]
  49.         return self.output_key

  50.     def get_current_entities(self, input_string: str) -> List[str]:
  51.         """从输入字符串中获取当前实体。"""
  52.         chain = LLMChain(llm=self.llm, prompt=self.entity_extraction_prompt)
  53.         buffer_string = get_buffer_string(
  54.             self.chat_memory.messages[-self.k * 2 :],
  55.             human_prefix=self.human_prefix,
  56.             ai_prefix=self.ai_prefix,
  57.         )
  58.         output = chain.predict(
  59.             history=buffer_string,
  60.             input=input_string,
  61.         )
  62.         return get_entities(output)

  63.     def _get_current_entities(self, inputs: Dict[str, Any]) -> List[str]:
  64.         """获取对话中的当前实体。"""
  65.         prompt_input_key = self._get_prompt_input_key(inputs)
  66.         return self.get_current_entities(inputs[prompt_input_key])

  67.     def get_knowledge_triplets(self, input_string: str) -> List[KnowledgeTriple]:
  68.         """从输入字符串中获取知识三元组。"""
  69.         chain = LLMChain(llm=self.llm, prompt=self.knowledge_extraction_prompt)
  70.         buffer_string = get_buffer_string(
  71.             self.chat_memory.messages[-self.k * 2 :],
  72.             human_prefix=self.human_prefix,
  73.             ai_prefix=self.ai_prefix,
  74.         )
  75.         output = chain.predict(
  76.             history=buffer_string,
  77.             input=input_string,
  78.             verbose=True,
  79.         )
  80.         knowledge = parse_triples(output)  # 解析三元组
  81.         return knowledge

  82.     def _get_and_update_kg(self, inputs: Dict[str, Any]) -> None:
  83.         """从对话历史中获取并更新知识图谱。"""
  84.         prompt_input_key = self._get_prompt_input_key(inputs)
  85.         knowledge = self.get_knowledge_triplets(inputs[prompt_input_key])
  86.         for triple in knowledge:
  87.             self.kg.add_triple(triple)  # 向知识图谱中添加三元组

  88.     def save_context(self, inputs: Dict[str, Any], outputs: Dict[str, str]) -> None:
  89.         """将此对话的上下文保存到缓冲区。"""
  90.         super().save_context(inputs, outputs)
  91.         self._get_and_update_kg(inputs)

  92.     def clear(self) -> None:
  93.         """清除记忆内容。"""
  94.         super().clear()
  95.         self.kg.clear()  # 清除知识图谱内容
复制代码

1.2.3 Tools层,工具

其实Chains层可以根据LLM + Prompt执行一些特定的逻辑,但是如果要用Chain实现所有的逻辑不现实,可以通过Tools层也可以实现,Tools层理解为技能比较合理,典型的比如搜索、Wikipedia、天气预报、ChatGPT服务等等
1.3 应用层:Agents

1.3.1 Agents:代理

简言之,有了基础层和能力层,我们可以构建各种各样好玩的,有价值的服务,这里就是Agent
具体而言,Agent 作为代理人去向 LLM 发出请求,然后采取行动,且检查结果直到工作完成,包括LLM无法处理的任务的代理 (例如搜索或计算,类似ChatGPT plus的插件有调用bing和计算器的功能)
比如,Agent 可以使用维基百科查找 Barack Obama 的出生日期,然后使用计算器计算他在 2023 年的年龄
  1. # pip install wikipedia
  2. from langchain.agents import load_tools
  3. from langchain.agents import initialize_agent
  4. from langchain.agents import AgentType

  5. tools = load_tools(["wikipedia", "llm-math"], llm=llm)
  6. agent = initialize_agent(tools,
  7.                          llm,
  8.                          agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
  9.                          verbose=True)


  10. agent.run("奥巴马的生日是哪天? 到2023年他多少岁了?")
复制代码

此外,关于Wikipedia可以关注下这个代码文件:langchain/docstore/wikipedia.py ...
最终langchain的整体技术架构可以如下图所示 (查看高清大图,此外,这里还有另一个架构图)



第二部分 基于LangChain + ChatGLM-6B(23年7月初版)的本地知识库问答

2.1 核心步骤:如何通过LangChain+LLM实现本地知识库问答

2023年7月,GitHub上有一个利用 langchain 思想实现的基于本地知识库的问答应用:langchain-ChatGLM (这是其GitHub地址,当然还有和它类似的但现已支持Vicuna-13b的项目,比如LangChain-ChatGLM-Webui ),目标期望建立一套对中文场景与开源模型支持友好、可离线运行的知识库问答解决方案
本项目实现原理如下图所示 (与基于文档的问答 大同小异,过程包括:1 加载文档 -> 2 读取文档 -> 3/4文档分割 -> 5/6 文本向量化 -> 8/9 问句向量化 -> 10 在文档向量中匹配出与问句向量最相似的top k个 -> 11/12/13 匹配出的文本作为上下文和问题一起添加到prompt中 -> 14/15提交给LLM生成回答 )



如你所见,这种通过组合langchain+LLM的方式,特别适合一些垂直领域或大型集团企业搭建通过LLM的智能对话能力搭建企业内部的私有问答系统,也适合个人专门针对一些英文paper进行问答,比如比较火的一个开源项目:ChatPDF,其从文档处理角度来看,实现流程如下(图源):



2.2 Facebook AI Similarity Search(FAISS):高效向量相似度检索

Faiss的全称是Facebook AI Similarity Search (官方介绍页GitHub地址),是FaceBook的AI团队针对大规模相似度检索问题开发的一个工具,使用C++编写,有python接口,对10亿量级的索引可以做到毫秒级检索的性能
简单来说,Faiss的工作,就是把我们自己的候选向量集封装成一个index数据库,它可以加速我们检索相似向量TopK的过程,其中有些索引还支持GPU构建
2.2.1 Faiss检索相似向量TopK的基本流程

Faiss检索相似向量TopK的工程基本都能分为三步:
  • 得到向量库  
    1. import numpy as np
    2. d = 64                                           # 向量维度
    3. nb = 100000                                      # index向量库的数据量
    4. nq = 10000                                       # 待检索query的数目
    5. np.random.seed(1234)            
    6. xb = np.random.random((nb, d)).astype('float32')
    7. xb[:, 0] += np.arange(nb) / 1000.                # index向量库的向量
    8. xq = np.random.random((nq, d)).astype('float32')
    9. xq[:, 0] += np.arange(nq) / 1000.                # 待检索的query向量
    复制代码

  • 用faiss 构建index,并将向量添加到index中
    其中的构建索引选用暴力检索的方法FlatL2,L2代表构建的index采用的相似度度量方法为L2范数,即欧氏距离  
    1. import faiss         
    2. index = faiss.IndexFlatL2(d)            
    3. print(index.is_trained)         # 输出为True,代表该类index不需要训练,只需要add向量进去即可
    4. index.add(xb)                   # 将向量库中的向量加入到index中
    5. print(index.ntotal)             # 输出index中包含的向量总数,为100000
    复制代码

  • 用faiss index 检索,检索出TopK的相似query  
    1. k = 4                     # topK的K值
    2. D, I = index.search(xq, k)# xq为待检索向量,返回的I为每个待检索query最相似TopK的索引list,D为其对应的距离
    3. print(I[:5])
    4. print(D[-5:])
    复制代码

    打印输出为:
    >>>
    [[  0 393 363  78]
      [  1 555 277 364]
      [  2 304 101  13]
      [  3 173  18 182]
      [  4 288 370 531]]  
    [[ 0.          7.17517328  7.2076292   7.25116253]  
      [ 0.          6.32356453  6.6845808   6.79994535]  
      [ 0.          5.79640865  6.39173603  7.28151226]  
      [ 0.          7.27790546  7.52798653  7.66284657]  
      [ 0.          6.76380348  7.29512024  7.36881447]]
2.2.2 FAISS构建索引的多种方式

构建index方法和传参方法可以为
  1. dim, measure = 64, faiss.METRIC_L2
  2. param = 'Flat'
  3. index = faiss.index_factory(dim, param, measure)
复制代码


    dim为向量维数最重要的是param参数,它是传入index的参数,代表需要构建什么类型的索引;measure为度量方法,目前支持两种,欧氏距离和inner product,即内积。因此,要计算余弦相似度,只需要将vecs归一化后,使用内积度量即可
此文,现在faiss官方支持八种度量方式,分别是:

    METRIC_INNER_PRODUCT(内积)METRIC_L1(曼哈顿距离)METRIC_L2(欧氏距离)METRIC_Linf(无穷范数)METRIC_Lp(p范数)METRIC_BrayCurtis(BC相异度)METRIC_Canberra(兰氏距离/堪培拉距离)METRIC_JensenShannon(JS散度)
2.2.2.1 Flat :暴力检索


    优点:该方法是Faiss所有index中最准确的,召回率最高的方法,没有之一;缺点:速度慢,占内存大。使用情况:向量候选集很少,在50万以内,并且内存不紧张。注:虽然都是暴力检索,faiss的暴力检索速度比一般程序猿自己写的暴力检索要快上不少,所以并不代表其无用武之地,建议有暴力检索需求的同学还是用下faiss。构建方法:
  1. dim, measure = 64, faiss.METRIC_L2
  2. param = 'Flat'
  3. index = faiss.index_factory(dim, param, measure)
  4. index.is_trained                                   # 输出为True
  5. index.add(xb)                                      # 向index中添加向量
复制代码

2.2.2.2 IVFx Flat :倒排暴力检索


    优点:IVF主要利用倒排的思想,在文档检索场景下的倒排技术是指,一个kw后面挂上很多个包含该词的doc,由于kw数量远远小于doc,因此会大大减少了检索的时间。在向量中如何使用倒排呢?可以拿出每个聚类中心下的向量ID,每个中心ID后面挂上一堆非中心向量,每次查询向量的时候找到最近的几个中心ID,分别搜索这几个中心下的非中心向量。通过减小搜索范围,提升搜索效率。缺点:速度也还不是很快。使用情况:相比Flat会大大增加检索的速度,建议百万级别向量可以使用。参数:IVFx中的x是k-means聚类中心的个数构建方法:
  1. dim, measure = 64, faiss.METRIC_L2
  2. param = 'IVF100,Flat'                           # 代表k-means聚类中心为100,   
  3. index = faiss.index_factory(dim, param, measure)
  4. print(index.is_trained)                          # 此时输出为False,因为倒排索引需要训练k-means,
  5. index.train(xb)                                  # 因此需要先训练index,再add向量
  6. index.add(xb)                                    
复制代码

2.2.2.3 PQx :乘积量化


    优点:利用乘积量化的方法,改进了普通检索,将一个向量的维度切成x段,每段分别进行检索,每段向量的检索结果取交集后得出最后的TopK。因此速度很快,而且占用内存较小,召回率也相对较高。缺点:召回率相较于暴力检索,下降较多。使用情况:内存及其稀缺,并且需要较快的检索速度,不那么在意召回率参数:PQx中的x为将向量切分的段数,因此,x需要能被向量维度整除,且x越大,切分越细致,时间复杂度越高构建方法:
  1. dim, measure = 64, faiss.METRIC_L2
  2. param =  'PQ16'
  3. index = faiss.index_factory(dim, param, measure)
  4. print(index.is_trained)                          # 此时输出为False,因为倒排索引需要训练k-means,
  5. index.train(xb)                                  # 因此需要先训练index,再add向量
  6. index.add(xb)         
复制代码

2.2.2.4 IVFxPQy 倒排乘积量化


    优点:工业界大量使用此方法,各项指标都均可以接受,利用乘积量化的方法,改进了IVF的k-means,将一个向量的维度切成x段,每段分别进行k-means再检索。缺点:集百家之长,自然也集百家之短使用情况:一般来说,各方面没啥特殊的极端要求的话,最推荐使用该方法!参数:IVFx,PQy,其中的x和y同上构建方法:
  1. dim, measure = 64, faiss.METRIC_L2  
  2. param =  'IVF100,PQ16'
  3. index = faiss.index_factory(dim, param, measure)
  4. print(index.is_trained)                          # 此时输出为False,因为倒排索引需要训练k-means,
  5. index.train(xb)                                  # 因此需要先训练index,再add向量 index.add(xb)      
复制代码

2.2.2.5 LSH 局部敏感哈希


    原理:哈希对大家再熟悉不过,向量也可以采用哈希来加速查找,我们这里说的哈希指的是局部敏感哈希(Locality Sensitive Hashing,LSH),不同于传统哈希尽量不产生碰撞,局部敏感哈希依赖碰撞来查找近邻。高维空间的两点若距离很近,那么设计一种哈希函数对这两点进行哈希计算后分桶,使得他们哈希分桶值有很大的概率是一样的,若两点之间的距离较远,则他们哈希分桶值相同的概率会很小。优点:训练非常快,支持分批导入,index占内存很小,检索也比较快缺点:召回率非常拉垮。使用情况:候选向量库非常大,离线检索,内存资源比较稀缺的情况构建方法:
  1. dim, measure = 64, faiss.METRIC_L2  
  2. param =  'LSH'
  3. index = faiss.index_factory(dim, param, measure)
  4. print(index.is_trained)                          # 此时输出为True
  5. index.add(xb)      
复制代码

2.2.2.6 HNSWx


    优点:该方法为基于图检索的改进方法,检索速度极快,10亿级别秒出检索结果,而且召回率几乎可以媲美Flat,最高能达到惊人的97%。检索的时间复杂度为loglogn,几乎可以无视候选向量的量级了。并且支持分批导入,极其适合线上任务,毫秒级别体验。缺点:构建索引极慢,占用内存极大(是Faiss中最大的,大于原向量占用的内存大小)参数:HNSWx中的x为构建图时每个点最多连接多少个节点,x越大,构图越复杂,查询越精确,当然构建index时间也就越慢,x取4~64中的任何一个整数。使用情况:不在乎内存,并且有充裕的时间来构建index构建方法:
  1. dim, measure = 64, faiss.METRIC_L2   
  2. param =  'HNSW64'
  3. index = faiss.index_factory(dim, param, measure)  
  4. print(index.is_trained)                          # 此时输出为True
  5. index.add(xb)
复制代码

2.3 项目部署:langchain + ChatGLM-6B搭建本地知识库问答

注意,由于这个项目后面一直在更新迭代,所以按照下面的步骤不一定能跑起来了
如有需要,建议参考七月官网首页的「大模型项目开发线上营」第19课中的实操文档:搭建Langchain-Chatchat0.2.7的步骤试下,包括七月提供的GPU服务器中则用Langchain-Chatchat的conda环境
2.3.1 部署过程一:支持多种使用模式

其中的LLM模型可以根据实际业务的需求选定,本项目中用的ChatGLM-6B,其GitHub地址为:https://github.com/THUDM/ChatGLM-6B
ChatGLM-6B 是⼀个开源的、⽀持中英双语的对话语⾔模型,基于 General LanguageModel (GLM) 架构,具有 62 亿参数。结合模型量化技术,用户可以在消费级的显卡上进行本地部署(INT4 量化级别下最低只需 6GB 显存)
ChatGLM-6B 使用了和 ChatGPT 相似的技术,针对中文问答和对话进行了优化。经过约 1T 标识符的中英双语训练,辅以监督微调、反馈自助、人类反馈强化学习等技术的加持,62 亿参数的 ChatGLM-6B 已经能生成相当符合人类偏好的回答
  • 新建一个python3.8.13的环境(模型文件还是可以用的)  
    1. conda create -n langchain python==3.8.13
    复制代码

  • 拉取项目  
    1. git clone https://github.com/imClumsyPanda/langchain-ChatGLM.git
    复制代码

  • 进入目录  
    1. cd langchain-ChatGLM
    复制代码

  • 安装requirements.txt  
    1. conda activate langchain
    2. pip install -r requirements.txt
    复制代码

  • 当前环境支持装langchain的最高版本是0.0.166,无法安装0.0.174,就先装下0.0.166试下
    修改配置文件路径:  
    1. vi configs/model_config.py
    复制代码

  • 将chatglm-6b的路径设置成自己的  
    1. “chatglm-6b”: {
    2. “name”: “chatglm-6b”,
    3. “pretrained_model_name”: “/data/sim_chatgpt/chatglm-6b”,
    4. “local_model_path”: None,
    5. “provides”: “ChatGLM”
    复制代码

  • 修改要运行的代码文件:webui.py  
    1. vi webui.py
    复制代码

    将最后launch函数中的share设置为True,inbrowser设置为True
  • 执行webui.py文件  
    1. python webui.py
    复制代码

    可能是网络问题,无法创建一个公用链接。可以进行云服务器和本地端口的映射,参考:https://www.cnblogs.com/monologuesmw/p/14465117.html
对应输出:


​占用显存情况:大约15个G

2.3.2 部署过程二:支持多种社区上的在线体验

项目地址:https://github.com/thomas-yanxin/LangChain-ChatGLM-Webui
HUggingFace社区在线体验:https://huggingface.co/spaces/thomas-yanxin/LangChain-ChatLLM
另外也支持ModelScope魔搭社区、飞桨AIStudio社区等在线体验
  • 下载项目  
    1. git clone https://github.com/thomas-yanxin/LangChain-ChatGLM-Webui.git
    复制代码

  • 进入目录  
    1. cd LangChain-ChatGLM-Webui
    复制代码

  • 安装所需的包  
    1. pip install -r requirements.txt
    2. pip install gradio==3.10
    复制代码

  • 修改config.py  
    1. init_llm = "ChatGLM-6B"

    2. llm_model_dict = {
    3.     "chatglm": {
    4.         "ChatGLM-6B": "/data/sim_chatgpt/chatglm-6b",
    复制代码

  • 修改app.py文件,将launch函数中的share设置为True,inbrowser设置为True
    执行webui.py文件  
    1. python webui.py
    复制代码



​显存占用约13G


第三部分 逐行深入分析:langchain-ChatGLM(23年7月初版)项目的源码解读

再回顾一遍langchain-ChatGLM这个项目的架构图(图源)


​你会发现该项目主要由以下各大模块组成(注意,该项目的最新版已经变化很大,本第三部分可以认为是针对v0.1.15左右的版本,新版对很多功能做了更高的封装,而从原理理解的角度来说,看老版 更好理解些)




    chains: 工作链路实现,如 chains/local_doc_qa 实现了基于本地⽂档的问答实现configs:配置文件存储knowledge_base/content:用于存储上传的原始⽂件loader: 文档加载器的实现类models: llm的接⼝类与实现类,针对开源模型提供流式输出⽀持textsplitter: 文本切分的实现类vectorstores:用于存储向量库⽂件,即本地知识库本体..
接下来,为方便读者一目了然,更快理解

    我基本给“下面该项目中的每一行代码”都添加上了中文注释且为理解更顺畅,我解读各个代码文件夹的顺序是根据项目流程逐一展开的 (而非上图GitHub上各个代码文件夹的呈现顺序)
如有问题,可以随时留言评论
3.1 agent:custom_agent/bing_search

3.1.1 agent/custom_agent.py

  1. from langchain.agents import Tool          # 导入工具模块
  2. from langchain.tools import BaseTool       # 导入基础工具类
  3. from langchain import PromptTemplate, LLMChain      # 导入提示模板和语言模型链
  4. from agent.custom_search import DeepSearch          # 导入自定义搜索模块

  5. # 导入基础单动作代理,输出解析器,语言模型单动作代理和代理执行器
  6. from langchain.agents import BaseSingleActionAgent, AgentOutputParser, LLMSingleActionAgent, AgentExecutor   
  7. from typing import List, Tuple, Any, Union, Optional, Type      # 导入类型注释模块
  8. from langchain.schema import AgentAction, AgentFinish           # 导入代理动作和代理完成模式
  9. from langchain.prompts import StringPromptTemplate          # 导入字符串提示模板
  10. from langchain.callbacks.manager import CallbackManagerForToolRun      # 导入工具运行回调管理器
  11. from langchain.base_language import BaseLanguageModel       # 导入基础语言模型
  12. import re                                                   # 导入正则表达式模块

  13. # 定义一个代理模板字符串
  14. agent_template = """
  15. 你现在是一个{role}。这里是一些已知信息:
  16. {related_content}
  17. {background_infomation}
  18. {question_guide}:{input}

  19. {answer_format}
  20. """

  21. # 定义一个自定义提示模板类,继承自字符串提示模板
  22. class CustomPromptTemplate(StringPromptTemplate):
  23.     template: str          # 提示模板字符串
  24.     tools: List[Tool]      # 工具列表

  25.     # 定义一个格式化函数,根据提供的参数生成最终的提示模板
  26.     def format(self, **kwargs) -> str:
  27.         intermediate_steps = kwargs.pop("intermediate_steps")
  28.         # 判断是否有互联网查询信息
  29.         if len(intermediate_steps) == 0:
  30.             # 如果没有,则给出默认的背景信息,角色,问题指导和回答格式
  31.             background_infomation = "\n"
  32.             role = "傻瓜机器人"
  33.             question_guide = "我现在有一个问题"
  34.             answer_format = "如果你知道答案,请直接给出你的回答!如果你不知道答案,请你只回答"DeepSearch('搜索词')",并将'搜索词'替换为你认为需要搜索的关键词,除此之外不要回答其他任何内容。\n\n下面请回答我上面提出的问题!"

  35.         else:
  36.             # 否则,根据 intermediate_steps 中的 AgentAction 拼装 background_infomation
  37.             background_infomation = "\n\n你还有这些已知信息作为参考:\n\n"
  38.             action, observation = intermediate_steps[0]
  39.             background_infomation += f"{observation}\n"
  40.             role = "聪明的 AI 助手"
  41.             question_guide = "请根据这些已知信息回答我的问题"
  42.             answer_format = ""

  43.         kwargs["background_infomation"] = background_infomation
  44.         kwargs["role"] = role
  45.         kwargs["question_guide"] = question_guide
  46.         kwargs["answer_format"] = answer_format
  47.         return self.template.format(**kwargs)  # 格式化模板并返回

  48. # 定义一个自定义搜索工具类,继承自基础工具类
  49. class CustomSearchTool(BaseTool):
  50.     name: str = "DeepSearch"           # 工具名称
  51.     description: str = ""              # 工具描述

  52.     # 定义一个运行函数,接受一个查询字符串和一个可选的回调管理器作为参数,返回DeepSearch的搜索结果
  53.     def _run(self, query: str, run_manager: Optional[CallbackManagerForToolRun] = None):
  54.         return DeepSearch.search(query = query)

  55.     # 定义一个异步运行函数,但由于DeepSearch不支持异步,所以直接抛出一个未实现错误
  56.     async def _arun(self, query: str):
  57.         raise NotImplementedError("DeepSearch does not support async")

  58. # 定义一个自定义代理类,继承自基础单动作代理
  59. class CustomAgent(BaseSingleActionAgent):
  60.     # 定义一个输入键的属性
  61.     @property
  62.     def input_keys(self):
  63.         return ["input"]

  64.     # 定义一个计划函数,接受一组中间步骤和其他参数,返回一个代理动作或者代理完成
  65.     def plan(self, intermedate_steps: List[Tuple[AgentAction, str]],
  66.             **kwargs: Any) -> Union[AgentAction, AgentFinish]:
  67.         return AgentAction(tool="DeepSearch", tool_input=kwargs["input"], log="")

  68. # 定义一个自定义输出解析器,继承自代理输出解析器
  69. class CustomOutputParser(AgentOutputParser):
  70.     # 定义一个解析函数,接受一个语言模型的输出字符串,返回一个代理动作或者代理完成
  71.     def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]:
  72.         # 使用正则表达式匹配输出字符串,group1是调用函数名字,group2是传入参数
  73.         match = re.match(r'^[\s\w]*(DeepSearch)\(([^\)]+)\)', llm_output, re.DOTALL)
  74.         print(match)

  75.         # 如果语言模型没有返回 DeepSearch() 则认为直接结束指令
  76.         if not match:
  77.             return AgentFinish(
  78.                 return_values={"output": llm_output.strip()},
  79.                 log=llm_output,
  80.             )
  81.         # 否则的话都认为需要调用 Tool
  82.         else:
  83.             action = match.group(1).strip()
  84.             action_input = match.group(2).strip()
  85.             return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output)


  86. # 定义一个深度代理类
  87. class DeepAgent:
  88.     tool_name: str = "DeepSearch"       # 工具名称
  89.     agent_executor: any                 # 代理执行器
  90.     tools: List[Tool]                   # 工具列表
  91.     llm_chain: any                      # 语言模型链

  92.     # 定义一个查询函数,接受一个相关内容字符串和一个查询字符串,返回执行器的运行结果
  93.     def query(self, related_content: str = "", query: str = ""):
  94.         tool_name =这段代码的主要目的是建立一个深度搜索的AI代理。AI代理首先通过接收一个问题输入,然后根据输入生成一个提示模板,然后通过该模板引导AI生成回答或进行更深入的搜索。现在,我将继续为剩余的代码添加中文注释

  95. ```python
  96.         self.tool_name
  97.         result = self.agent_executor.run(related_content=related_content, input=query ,tool_name=self.tool_name)
  98.         return result       # 返回执行器的运行结果

  99.     # 在初始化函数中,首先从DeepSearch工具创建一个工具实例,并添加到工具列表中
  100.     def __init__(self, llm: BaseLanguageModel, **kwargs):
  101.         tools = [
  102.                     Tool.from_function(
  103.                         func=DeepSearch.search,
  104.                         name="DeepSearch",
  105.                         description=""
  106.                     )
  107.                 ]
  108.         self.tools = tools      # 保存工具列表
  109.         tool_names = [tool.name for tool in tools]    # 提取工具列表中的工具名称
  110.         output_parser = CustomOutputParser()          # 创建一个自定义输出解析器实例
  111.         # 创建一个自定义提示模板实例
  112.         prompt = CustomPromptTemplate(template=agent_template,
  113.                                       tools=tools,
  114.                                       input_variables=["related_content","tool_name", "input", "intermediate_steps"])
  115.         # 创建一个语言模型链实例
  116.         llm_chain = LLMChain(llm=llm, prompt=prompt)
  117.         self.llm_chain = llm_chain      # 保存语言模型链实例

  118.         # 创建一个语言模型单动作代理实例
  119.         agent = LLMSingleActionAgent(
  120.             llm_chain=llm_chain,
  121.             output_parser=output_parser,
  122.             stop=["\nObservation:"],
  123.             allowed_tools=tool_names
  124.         )

  125.         # 创建一个代理执行器实例
  126.         agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True)
  127.         self.agent_executor = agent_executor         # 保存代理执行器实例
复制代码

3.1.2 agent/bing_search.py

  1. #coding=utf8
  2. # 声明文件编码格式为 utf8

  3. from langchain.utilities import BingSearchAPIWrapper
  4. # 导入 BingSearchAPIWrapper 类,这个类用于与 Bing 搜索 API 进行交互

  5. from configs.model_config import BING_SEARCH_URL, BING_SUBSCRIPTION_KEY
  6. # 导入配置文件中的 Bing 搜索 URL 和 Bing 订阅密钥

  7. def bing_search(text, result_len=3):
  8.     # 定义一个名为 bing_search 的函数,该函数接收一个文本和结果长度的参数,默认结果长度为3

  9.     if not (BING_SEARCH_URL and BING_SUBSCRIPTION_KEY):
  10.         # 如果 Bing 搜索 URL 或 Bing 订阅密钥未设置,则返回一个错误信息的文档
  11.         return [{"snippet": "please set BING_SUBSCRIPTION_KEY and BING_SEARCH_URL in os ENV",
  12.                  "title": "env inof not fould",
  13.                  "link": "https://python.langchain.com/en/latest/modules/agents/tools/examples/bing_search.html"}]

  14.     search = BingSearchAPIWrapper(bing_subscription_key=BING_SUBSCRIPTION_KEY,
  15.                                   bing_search_url=BING_SEARCH_URL)
  16.     # 创建 BingSearchAPIWrapper 类的实例,该实例用于与 Bing 搜索 API 进行交互

  17.     return search.results(text, result_len)
  18.     # 返回搜索结果,结果的数量由 result_len 参数决定

  19. if __name__ == "__main__":
  20.     # 如果这个文件被直接运行,而不是被导入作为模块,那么就执行以下代码

  21.     r = bing_search('python')
  22.     # 使用 Bing 搜索 API 来搜索 "python" 这个词,并将结果保存在变量 r 中

  23.     print(r)
  24.     # 打印出搜索结果
复制代码

3.2 models:包含models和文档加载器loader


    models: llm的接⼝类与实现类,针对开源模型提供流式输出⽀持loader: 文档加载器的实现类


3.2.1models/chatglm_llm.py

  1. from abc import ABC  # 导入抽象基类
  2. from langchain.llms.base import LLM           # 导入语言学习模型基类
  3. from typing import Optional, List             # 导入类型标注模块
  4. from models.loader import LoaderCheckPoint    # 导入模型加载点
  5. from models.base import (BaseAnswer,          # 导入基本回答模型
  6.                          AnswerResult)        # 导入回答结果模型


  7. class ChatGLM(BaseAnswer, LLM, ABC):  # 定义ChatGLM类,继承基础回答、语言学习模型和抽象基类
  8.     max_token: int = 10000          # 最大的token数
  9.     temperature: float = 0.01       # 温度参数,用于控制生成文本的随机性
  10.     top_p = 0.9  # 排序前0.9的token会被保留
  11.     checkPoint: LoaderCheckPoint = None  # 检查点模型
  12.     # history = []          # 历史记录
  13.     history_len: int = 10   # 历史记录长度

  14.     def __init__(self, checkPoint: LoaderCheckPoint = None):  # 初始化方法
  15.         super().__init__()  # 调用父类的初始化方法
  16.         self.checkPoint = checkPoint  # 赋值检查点模型

  17.     @property
  18.     def _llm_type(self) -> str:  # 定义只读属性_llm_type,返回语言学习模型的类型
  19.         return "ChatGLM"

  20.     @property
  21.     def _check_point(self) -> LoaderCheckPoint:  # 定义只读属性_check_point,返回检查点模型
  22.         return self.checkPoint

  23.     @property
  24.     def _history_len(self) -> int:  # 定义只读属性_history_len,返回历史记录的长度
  25.         return self.history_len

  26.     def set_history_len(self, history_len: int = 10) -> None:  # 设置历史记录长度
  27.         self.history_len = history_len

  28.     def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:  # 定义_call方法,实现模型的具体调用
  29.         print(f"__call:{prompt}")  # 打印调用的提示信息
  30.         response, _ = self.checkPoint.model.chat(  # 调用模型的chat方法,获取回答和其他信息
  31.             self.checkPoint.tokenizer,  # 使用的分词器
  32.             prompt,  # 提示信息
  33.             history=[],  # 历史记录
  34.             max_length=self.max_token,      # 最大长度
  35.             temperature=self.temperature    # 温度参数
  36.         )
  37.         print(f"response:{response}")  # 打印回答信息
  38.         print(f"+++++++++++++++++++++++++++++++++++")  # 打印分隔线
  39.         return response  # 返回回答

  40.     def generatorAnswer(self, prompt: str,
  41.                          history: List[List[str]] = [],
  42.                          streaming: bool = False):  # 定义生成回答的方法,可以处理流式输入

  43.         if streaming:  # 如果是流式输入
  44.             history += [[]]  # 在历史记录中添加新的空列表
  45.             for inum, (stream_resp, _) in enumerate(self.checkPoint.model.stream_chat(  # 对模型的stream_chat方法返回的结果进行枚举
  46.                     self.checkPoint.tokenizer,  # 使用的分词器
  47.                     prompt,  # 提示信息
  48.                     history=history[-self.history_len:-1] if self.history_len > 1 else [],  # 使用的历史记录
  49.                     max_length=self.max_token,  # 最大长度
  50.                     temperature=self.temperature  # 温度参数
  51.             )):
  52.                 # self.checkPoint.clear_torch_cache()  # 清空缓存
  53.                 history[-1] = [prompt, stream_resp]  # 更新最后一个历史记录
  54.                 answer_result = AnswerResult()  # 创建回答结果对象
  55.                 answer_result.history = history  # 更新回答结果的历史记录
  56.                 answer_result.llm_output = {"answer": stream_resp}  # 更新回答结果的输出
  57.                 yield answer_result  # 生成回答结果
  58.         else:  # 如果不是流式输入
  59.             response, _ = self.checkPoint.model.chat(  # 调用模型的chat方法,获取回答和其他信息
  60.                 self.checkPoint.tokenizer,  # 使用的分词器
  61.                 prompt,  # 提示信息
  62.                 history=history[-self.history_len:] if self.history_len > 0 else [],  # 使用的历史记录
  63.                 max_length=self.max_token,  # 最大长度
  64.                 temperature=self.temperature  # 温度参数
  65.             )
  66.             self.checkPoint.clear_torch_cache()  # 清空缓存
  67.             history += [[prompt, response]]  # 更新历史记录
  68.             answer_result = AnswerResult()  # 创建回答结果对象
  69.             answer_result.history = history  # 更新回答结果的历史记录
  70.             answer_result.llm_output = {"answer": response}  # 更新回答结果的输出
  71.             yield answer_result  # 生成回答结果
复制代码

3.2.2 models/shared.py

这个文件的作用是远程调用LLM
  1. import sys      # 导入sys模块,通常用于与Python解释器进行交互
  2. from typing import Any      # 从typing模块导入Any,用于表示任何类型

  3. # 从models.loader.args模块导入parser,可能是解析命令行参数用
  4. from models.loader.args import parser      
  5. # 从models.loader模块导入LoaderCheckPoint,可能是模型加载点
  6. from models.loader import LoaderCheckPoint  

  7. # 从configs.model_config模块导入llm_model_dict和LLM_MODEL
  8. from configs.model_config import (llm_model_dict, LLM_MODEL)  
  9. # 从models.base模块导入BaseAnswer,即模型的基础类
  10. from models.base import BaseAnswer  

  11. # 定义一个名为loaderCheckPoint的变量,类型为LoaderCheckPoint,并初始化为None
  12. loaderCheckPoint: LoaderCheckPoint = None  


  13. def loaderLLM(llm_model: str = None, no_remote_model: bool = False, use_ptuning_v2: bool = False) -> Any:
  14.     """
  15.     初始化 llm_model_ins LLM
  16.     :param llm_model: 模型名称
  17.     :param no_remote_model: 是否使用远程模型,如果需要加载本地模型,则添加 `--no-remote-model
  18.     :param use_ptuning_v2: 是否使用 p-tuning-v2 PrefixEncoder
  19.     :return:
  20.     """
  21.     pre_model_name = loaderCheckPoint.model_name      # 获取loaderCheckPoint的模型名称
  22.     llm_model_info = llm_model_dict[pre_model_name]   # 从模型字典中获取模型信息

  23.     if no_remote_model:      # 如果不使用远程模型
  24.         loaderCheckPoint.no_remote_model = no_remote_model  # 将loaderCheckPoint的no_remote_model设置为True
  25.     if use_ptuning_v2:       # 如果使用p-tuning-v2
  26.         loaderCheckPoint.use_ptuning_v2 = use_ptuning_v2    # 将loaderCheckPoint的use_ptuning_v2设置为True

  27.     if llm_model:            # 如果指定了模型名称
  28.         llm_model_info = llm_model_dict[llm_model]  # 从模型字典中获取指定的模型信息

  29.     if loaderCheckPoint.no_remote_model:  # 如果不使用远程模型
  30.         loaderCheckPoint.model_name = llm_model_info['name']  # 将loaderCheckPoint的模型名称设置为模型信息中的name
  31.     else:  # 如果使用远程模型
  32.         loaderCheckPoint.model_name = llm_model_info['pretrained_model_name']  # 将loaderCheckPoint的模型名称设置为模型信息中的pretrained_model_name

  33.     loaderCheckPoint.model_path = llm_model_info["local_model_path"]  # 设置模型的本地路径

  34.     if 'FastChatOpenAILLM' in llm_model_info["provides"]:  # 如果模型信息中的provides包含'FastChatOpenAILLM'
  35.         loaderCheckPoint.unload_model()  # 卸载模型
  36.     else:  # 如果不包含
  37.         loaderCheckPoint.reload_model()  # 重新加载模型

  38.     provides_class = getattr(sys.modules['models'], llm_model_info['provides'])  # 获取模型类
  39.     modelInsLLM = provides_class(checkPoint=loaderCheckPoint)  # 创建模型实例
  40.     if 'FastChatOpenAILLM' in llm_model_info["provides"]:      # 如果模型信息中的provides包含'FastChatOpenAILLM'
  41.         modelInsLLM.set_api_base_url(llm_model_info['api_base_url'])  # 设置API基础URL
  42.         modelInsLLM.call_model_name(llm_model_info['name'])    # 设置模型名称
  43.     return modelInsLLM  # 返回模型实例
复制代码

// 待更..
3.3 configs:配置文件存储model_config.py

  1. import torch.cuda
  2. import torch.backends
  3. import os
  4. import logging
  5. import uuid

  6. LOG_FORMAT = "%(levelname) -5s %(asctime)s" "-1d: %(message)s"
  7. logger = logging.getLogger()
  8. logger.setLevel(logging.INFO)
  9. logging.basicConfig(format=LOG_FORMAT)

  10. # 在以下字典中修改属性值,以指定本地embedding模型存储位置
  11. # 如将 "text2vec": "GanymedeNil/text2vec-large-chinese" 修改为 "text2vec": "User/Downloads/text2vec-large-chinese"
  12. # 此处请写绝对路径
  13. embedding_model_dict = {
  14.     "ernie-tiny": "nghuyong/ernie-3.0-nano-zh",
  15.     "ernie-base": "nghuyong/ernie-3.0-base-zh",
  16.     "text2vec-base": "shibing624/text2vec-base-chinese",
  17.     "text2vec": "GanymedeNil/text2vec-large-chinese",
  18.     "m3e-small": "moka-ai/m3e-small",
  19.     "m3e-base": "moka-ai/m3e-base",
  20. }

  21. # Embedding model name
  22. EMBEDDING_MODEL = "text2vec"

  23. # Embedding running device
  24. EMBEDDING_DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"


  25. # supported LLM models
  26. # llm_model_dict 处理了loader的一些预设行为,如加载位置,模型名称,模型处理器实例
  27. # 在以下字典中修改属性值,以指定本地 LLM 模型存储位置
  28. # 如将 "chatglm-6b" 的 "local_model_path" 由 None 修改为 "User/Downloads/chatglm-6b"
  29. # 此处请写绝对路径
  30. llm_model_dict = {
  31.     "chatglm-6b-int4-qe": {
  32.         "name": "chatglm-6b-int4-qe",
  33.         "pretrained_model_name": "THUDM/chatglm-6b-int4-qe",
  34.         "local_model_path": None,
  35.         "provides": "ChatGLM"
  36.     },
  37.     "chatglm-6b-int4": {
  38.         "name": "chatglm-6b-int4",
  39.         "pretrained_model_name": "THUDM/chatglm-6b-int4",
  40.         "local_model_path": None,
  41.         "provides": "ChatGLM"
  42.     },
  43.     "chatglm-6b-int8": {
  44.         "name": "chatglm-6b-int8",
  45.         "pretrained_model_name": "THUDM/chatglm-6b-int8",
  46.         "local_model_path": None,
  47.         "provides": "ChatGLM"
  48.     },
  49.     "chatglm-6b": {
  50.         "name": "chatglm-6b",
  51.         "pretrained_model_name": "THUDM/chatglm-6b",
  52.         "local_model_path": None,
  53.         "provides": "ChatGLM"
  54.     },
  55.     "chatglm2-6b": {
  56.         "name": "chatglm2-6b",
  57.         "pretrained_model_name": "THUDM/chatglm2-6b",
  58.         "local_model_path": None,
  59.         "provides": "ChatGLM"
  60.     },
  61.     "chatglm2-6b-int4": {
  62.         "name": "chatglm2-6b-int4",
  63.         "pretrained_model_name": "THUDM/chatglm2-6b-int4",
  64.         "local_model_path": None,
  65.         "provides": "ChatGLM"
  66.     },
  67.     "chatglm2-6b-int8": {
  68.         "name": "chatglm2-6b-int8",
  69.         "pretrained_model_name": "THUDM/chatglm2-6b-int8",
  70.         "local_model_path": None,
  71.         "provides": "ChatGLM"
  72.     },
  73.     "chatyuan": {
  74.         "name": "chatyuan",
  75.         "pretrained_model_name": "ClueAI/ChatYuan-large-v2",
  76.         "local_model_path": None,
  77.         "provides": None
  78.     },
  79.     "moss": {
  80.         "name": "moss",
  81.         "pretrained_model_name": "fnlp/moss-moon-003-sft",
  82.         "local_model_path": None,
  83.         "provides": "MOSSLLM"
  84.     },
  85.     "vicuna-13b-hf": {
  86.         "name": "vicuna-13b-hf",
  87.         "pretrained_model_name": "vicuna-13b-hf",
  88.         "local_model_path": None,
  89.         "provides": "LLamaLLM"
  90.     },

  91.     # 通过 fastchat 调用的模型请参考如下格式
  92.     "fastchat-chatglm-6b": {
  93.         "name": "chatglm-6b",             # "name"修改为fastchat服务中的"model_name"
  94.         "pretrained_model_name": "chatglm-6b",
  95.         "local_model_path": None,
  96.         "provides": "FastChatOpenAILLM",  # 使用fastchat api时,需保证"provides"为"FastChatOpenAILLM"
  97.         "api_base_url": "http://localhost:8000/v1"  # "name"修改为fastchat服务中的"api_base_url"
  98.     },
  99.     "fastchat-chatglm2-6b": {
  100.         "name": "chatglm2-6b",              # "name"修改为fastchat服务中的"model_name"
  101.         "pretrained_model_name": "chatglm2-6b",
  102.         "local_model_path": None,
  103.         "provides": "FastChatOpenAILLM",    # 使用fastchat api时,需保证"provides"为"FastChatOpenAILLM"
  104.         "api_base_url": "http://localhost:8000/v1"  # "name"修改为fastchat服务中的"api_base_url"
  105.     },

  106.     # 通过 fastchat 调用的模型请参考如下格式
  107.     "fastchat-vicuna-13b-hf": {
  108.         "name": "vicuna-13b-hf",          # "name"修改为fastchat服务中的"model_name"
  109.         "pretrained_model_name": "vicuna-13b-hf",
  110.         "local_model_path": None,
  111.         "provides": "FastChatOpenAILLM",  # 使用fastchat api时,需保证"provides"为"FastChatOpenAILLM"
  112.         "api_base_url": "http://localhost:8000/v1"  # "name"修改为fastchat服务中的"api_base_url"
  113.     },
  114. }

  115. # LLM 名称
  116. LLM_MODEL = "chatglm-6b"
  117. # 量化加载8bit 模型
  118. LOAD_IN_8BIT = False
  119. # Load the model with bfloat16 precision. Requires NVIDIA Ampere GPU.
  120. BF16 = False
  121. # 本地lora存放的位置
  122. LORA_DIR = "loras/"

  123. # LLM lora path,默认为空,如果有请直接指定文件夹路径
  124. LLM_LORA_PATH = ""
  125. USE_LORA = True if LLM_LORA_PATH else False

  126. # LLM streaming reponse
  127. STREAMING = True

  128. # Use p-tuning-v2 PrefixEncoder
  129. USE_PTUNING_V2 = False

  130. # LLM running device
  131. LLM_DEVICE = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"

  132. # 知识库默认存储路径
  133. KB_ROOT_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "knowledge_base")

  134. # 基于上下文的prompt模版,请务必保留"{question}"和"{context}"
  135. PROMPT_TEMPLATE = """已知信息:
  136. {context}

  137. 根据上述已知信息,简洁和专业的来回答用户的问题。如果无法从中得到答案,请说 “根据已知信息无法回答该问题” 或 “没有提供足够的相关信息”,不允许在答案中添加编造成分,答案请使用中文。 问题是:{question}"""

  138. # 缓存知识库数量,如果是ChatGLM2,ChatGLM2-int4,ChatGLM2-int8模型若检索效果不好可以调成’10’
  139. CACHED_VS_NUM = 1

  140. # 文本分句长度
  141. SENTENCE_SIZE = 100

  142. # 匹配后单段上下文长度
  143. CHUNK_SIZE = 250

  144. # 传入LLM的历史记录长度
  145. LLM_HISTORY_LEN = 3

  146. # 知识库检索时返回的匹配内容条数
  147. VECTOR_SEARCH_TOP_K = 5

  148. # 知识检索内容相关度 Score, 数值范围约为0-1100,如果为0,则不生效,经测试设置为小于500时,匹配结果更精准
  149. VECTOR_SEARCH_SCORE_THRESHOLD = 0

  150. NLTK_DATA_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "nltk_data")

  151. FLAG_USER_NAME = uuid.uuid4().hex

  152. logger.info(f"""
  153. loading model config
  154. llm device: {LLM_DEVICE}
  155. embedding device: {EMBEDDING_DEVICE}
  156. dir: {os.path.dirname(os.path.dirname(__file__))}
  157. flagging username: {FLAG_USER_NAME}
  158. """)

  159. # 是否开启跨域,默认为False,如果需要开启,请设置为True
  160. # is open cross domain
  161. OPEN_CROSS_DOMAIN = False

  162. # Bing 搜索必备变量
  163. # 使用 Bing 搜索需要使用 Bing Subscription Key,需要在azure port中申请试用bing search
  164. # 具体申请方式请见
  165. # https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/create-bing-search-service-resource
  166. # 使用python创建bing api 搜索实例详见:
  167. # https://learn.microsoft.com/en-us/bing/search-apis/bing-web-search/quickstarts/rest/python
  168. BING_SEARCH_URL = "https://api.bing.microsoft.com/v7.0/search"
  169. # 注意不是bing Webmaster Tools的api key,

  170. # 此外,如果是在服务器上,报Failed to establish a new connection: [Errno 110] Connection timed out
  171. # 是因为服务器加了防火墙,需要联系管理员加白名单,如果公司的服务器的话,就别想了GG
  172. BING_SUBSCRIPTION_KEY = ""

  173. # 是否开启中文标题加强,以及标题增强的相关配置
  174. # 通过增加标题判断,判断哪些文本为标题,并在metadata中进行标记;
  175. # 然后将文本与往上一级的标题进行拼合,实现文本信息的增强。
  176. ZH_TITLE_ENHANCE = False
复制代码

3.4 loader:文档加载与text转换

3.4.1 loader/pdf_loader.py

  1. # 导入类型提示模块,用于强化代码的可读性和健壮性
  2. from typing import List

  3. # 导入UnstructuredFileLoader,这是一个从非结构化文件中加载文档的类
  4. from langchain.document_loaders.unstructured import UnstructuredFileLoader

  5. # 导入PaddleOCR,这是一个开源的OCR工具,用于从图片中识别和读取文字
  6. from paddleocr import PaddleOCR

  7. # 导入os模块,用于处理文件和目录
  8. import os

  9. # 导入fitz模块,用于处理PDF文件
  10. import fitz

  11. # 导入nltk模块,用于处理文本数据
  12. import nltk

  13. # 导入模型配置文件中的NLTK_DATA_PATH,这是nltk数据的路径
  14. from configs.model_config import NLTK_DATA_PATH

  15. # 设置nltk数据的路径,将模型配置中的路径添加到nltk的数据路径中
  16. nltk.data.path = [NLTK_DATA_PATH] + nltk.data.path

  17. # 定义一个类,UnstructuredPaddlePDFLoader,该类继承自UnstructuredFileLoader
  18. class UnstructuredPaddlePDFLoader(UnstructuredFileLoader):

  19.     # 定义一个内部方法_get_elements,返回一个列表
  20.     def _get_elements(self) -> List:

  21.         # 定义一个内部函数pdf_ocr_txt,用于从pdf中进行OCR并输出文本文件
  22.         def pdf_ocr_txt(filepath, dir_path="tmp_files"):
  23.             # 将dir_path与filepath的目录部分合并成一个新的路径
  24.             full_dir_path = os.path.join(os.path.dirname(filepath), dir_path)

  25.             # 如果full_dir_path对应的目录不存在,则创建这个目录
  26.             if not os.path.exists(full_dir_path):
  27.                 os.makedirs(full_dir_path)
  28.             
  29.             # 创建一个PaddleOCR实例,设置一些参数
  30.             ocr = PaddleOCR(use_angle_cls=True, lang="ch", use_gpu=False, show_log=False)

  31.             # 打开pdf文件
  32.             doc = fitz.open(filepath)

  33.             # 创建一个txt文件的路径
  34.             txt_file_path = os.path.join(full_dir_path, f"{os.path.split(filepath)[-1]}.txt")

  35.             # 创建一个临时的图片文件路径
  36.             img_name = os.path.join(full_dir_path, 'tmp.png')

  37.             # 打开txt_file_path对应的文件,并以写模式打开
  38.             with open(txt_file_path, 'w', encoding='utf-8') as fout:
  39.                 # 遍历pdf的所有页面
  40.                 for i in range(doc.page_count):
  41.                     # 获取当前页面
  42.                     page = doc[i]

  43.                     # 获取当前页面的文本内容,并写入txt文件
  44.                     text = page.get_text("")
  45.                     fout.write(text)
  46.                     fout.write("\n")

  47.                     # 获取当前页面的所有图片
  48.                     img_list = page.get_images()

  49.                     # 遍历所有图片
  50.                     for img in img_list:
  51.                         # 将图片转换为Pixmap对象
  52.                         pix = fitz.Pixmap(doc, img[0])

  53.                         # 如果图片有颜色信息,则将其转换为RGB格式
  54.                         if pix.n - pix.alpha >= 4:
  55.                             pix = fitz.Pixmap(fitz.csRGB, pix)
  56.                         
  57.                         # 保存图片
  58.                         pix.save(img_name)

  59.                         # 对图片进行OCR识别
  60.                         result = ocr.ocr(img_name)

  61.                         # 从OCR结果中提取文本,并写入txt文件
  62.                         ocr_result = [i[1][0] for line in result for i in line]
  63.                         fout.write("\n".join(ocr_result))
  64.             
  65.             # 如果图片文件存在,则删除它
  66.             if os.path.exists(img_name):
  67.                 os.remove(img_name)
  68.             
  69.             # 返回txt文件的路径
  70.             return txt_file_path

  71.         # 调用上面定义的函数,获取txt文件的路径
  72.         txt_file_path = pdf_ocr_txt(self.file_path)

  73.         # 导入partition_text函数,该函数用于将文本文件分块
  74.         from unstructured.partition.text import partition_text

  75.         # 对txt文件进行分块,并返回分块结果
  76.         return partition_text(filename=txt_file_path, **self.unstructured_kwargs)

  77. # 运行入口
  78. if __name__ == "__main__":
  79.     # 导入sys模块,用于操作Python的运行环境
  80.     import sys

  81.     # 将当前文件的上一级目录添加到Python的搜索路径中
  82.     sys.path.append(os.path.dirname(os.path.dirname(__file__)))

  83.     # 定义一个pdf文件的路径
  84.     filepath = os.path.join(os.path.dirname(os.path.dirname(__file__)), "knowledge_base", "samples", "content", "test.pdf")

  85.     # 创建一个UnstructuredPaddlePDFLoader的实例
  86.     loader = UnstructuredPaddlePDFLoader(filepath, mode="elements")

  87.     # 加载文档
  88.     docs = loader.load()

  89.     # 遍历并打印所有文档
  90.     for doc in docs:
  91.         print(doc)
复制代码

// 待更..
3.5 textsplitter:文档切分

3.5.1 textsplitter/ali_text_splitter.py

ali_text_splitter.py的代码如下所示
  1. # 导入CharacterTextSplitter模块,用于文本切分
  2. from langchain.text_splitter import CharacterTextSplitter  
  3. import re                  # 导入正则表达式模块,用于文本匹配和替换
  4. from typing import List    # 导入List类型,用于指定返回的数据类型

  5. # 定义一个新的类AliTextSplitter,继承自CharacterTextSplitter
  6. class AliTextSplitter(CharacterTextSplitter):  
  7.     # 类的初始化函数,如果参数pdf为True,那么使用pdf文本切分规则,否则使用默认规则
  8.     def __init__(self, pdf: bool = False, **kwargs):  
  9.         # 调用父类的初始化函数,接收传入的其他参数
  10.         super().__init__(**kwargs)  
  11.         self.pdf = pdf          # 将pdf参数保存为类的成员变量

  12.     # 定义文本切分方法,输入参数为一个字符串,返回值为字符串列表
  13.     def split_text(self, text: str) -> List[str]:  
  14.         if self.pdf:            # 如果pdf参数为True,那么对文本进行预处理

  15.             # 替换掉连续的3个及以上的换行符为一个换行符
  16.             text = re.sub(r"\n{3,}", r"\n", text)  
  17.             # 将所有的空白字符(包括空格、制表符、换页符等)替换为一个空格
  18.             text = re.sub('\s', " ", text)  
  19.             # 将连续的两个换行符替换为一个空字符
  20.             text = re.sub("\n\n", "", text)  
  21.         
  22.         # 导入pipeline模块,用于创建一个处理流程
  23.         from modelscope.pipelines import pipeline  

  24.         # 创建一个document-segmentation任务的处理流程
  25.         # 用的模型为damo/nlp_bert_document-segmentation_chinese-base,计算设备为cpu
  26.         p = pipeline(
  27.             task="document-segmentation",
  28.             model='damo/nlp_bert_document-segmentation_chinese-base',
  29.             device="cpu")
  30.         result = p(documents=text)    # 对输入的文本进行处理,返回处理结果
  31.         sent_list = [i for i in result["text"].split("\n\t") if i]  # 将处理结果按照换行符和制表符进行切分,得到句子列表
  32.         return sent_list              # 返回句子列表
复制代码

其中,有三点值得注意下

    参数use_document_segmentation指定是否用语义切分文档
    此处采取的文档语义分割模型为达摩院开源的:nlp_bert_document-segmentation_chinese-base  (这是其论文)
  • 另,如果使用模型进行文档语义切分,那么需要安装:  
    1. modelscope[nlp]:pip install "modelscope[nlp]" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html
    复制代码

    且考虑到使用了三个模型,可能对于低配置gpu不太友好,因此这里将模型load进cpu计算,有需要的话可以替换device为自己的显卡id
3.6 knowledge_base:存储用户上传的文件并向量化

knowledge_bas下面有两个文件,一个content 即用户上传的原始文件,vector_store则用于存储向量库⽂件,即本地知识库本体,因为content因人而异 谁上传啥就是啥 所以没啥好分析,而vector_store下面则有两个文件,一个index.faiss,一个index.pkl
3.7 chains:向量搜索/匹配

如之前所述,本节开头图中“FAISS索引、FAISS搜索”中的“FAISS”是Facebook AI推出的一种用于有效搜索大规模高维向量空间中相似度的库,在大规模数据集中快速找到与给定向量最相似的向量是很多AI应用的重要组成部分,例如在推荐系统、自然语言处理、图像检索等领域
3.7.1 chains/modules /vectorstores.py文件:根据查询向量query在向量数据库中查找与query相似的文本向量

主要是关于

    FAISS (Facebook AI Similarity Search)的使用,具体体现在max_marginal_relevance_search_by_vector 中(如下图的最上面部分)
  • 以及一个FAISS向量存储类(FAISSVS,FAISSVS类继承自FAISS类)的定义,包含两个方法
    一个 max_marginal_relevance_search (如下图的中间部分,其最后会调用上面的max_marginal_relevance_search_by_vector)
    一个 __from (如下图的最下面部分)   
接下来,我们逐一分析下这几个函数
以上就是这段代码的主要内容,通过使用FAISS和MMR,它可以帮助我们在大量文本中找到与给定查询最相关的文本
3.7.2 chains /local_doc_qa.py代码文件:向量搜索


    导入包和模块
    代码开始的部分是一系列的导入语句,导入了必要的 Python 包和模块,包括文件加载器,文本分割器,模型配置,以及一些 Python 内建模块和其他第三方库改写 HuggingFaceEmbeddings 类的哈希方法
    代码定义了一个名为 _embeddings_hash 的函数,并将其赋值给 HuggingFaceEmbeddings 类的 __hash__ 方法。这样做的目的是使 HuggingFaceEmbeddings 对象可以被哈希,即可以作为字典的键或者被加入到集合中载入向量存储器
    定义了一个名为 load_vector_store 的函数,这个函数用于从本地加载一个向量存储器,返回 FAISS 类的对象。其中使用了 lru_cache 装饰器,可以缓存最近使用的 CACHED_VS_NUM 个结果,提高代码效率文件树遍历
    tree 函数是一个递归函数,用于遍历指定目录下的所有文件,返回一个包含所有文件的完整路径和文件名的列表。它可以忽略指定的文件或目录加载文件:
    load_file 函数根据文件后缀名选择合适的加载器和文本分割器,加载并分割文件生成提醒:
    generate_prompt 函数用于根据相关文档和查询生成一个提醒。提醒的模板由 prompt_template 参数提供
  • 创建文档列表
    search_result2docs
    1. # 创建一个空列表,用于存储文档
    2. def search_result2docs(search_results):
    3.     docs = []

    4.     # 对于搜索结果中的每一项
    5.     for result in search_results:
    6.         # 创建一个文档对象
    7.         # 如果结果中包含"snippet"关键字,则其值作为页面内容,否则页面内容为空字符串
    8.         # 如果结果中包含"link"关键字,则其值作为元数据中的源链接,否则源链接为空字符串
    9.         # 如果结果中包含"title"关键字,则其值作为元数据中的文件名,否则文件名为空字符串
    10.         doc = Document(page_content=result["snippet"] if "snippet" in result.keys() else "",
    11.                        metadata={"source": result["link"] if "link" in result.keys() else "",
    12.                                  "filename": result["title"] if "title" in result.keys() else ""})

    13.         # 将创建的文档对象添加到列表中
    14.         docs.append(doc)
    15.    
    16.     # 返回文档列表
    17.     return docs
    复制代码

之后,定义了一个名为 LocalDocQA 的类,主要用于基于文档的问答任务。基于文档的问答任务的主要功能是,根据一组给定的文档(这里被称为知识库)以及用户输入的问题,返回一个答案,LocalDocQA 类的主要方法包括:

    init_cfg():此方法初始化一些变量,包括将 llm_model(一个语言模型用于生成答案)分配给 self.llm,将一个基于HuggingFace的嵌入模型分配给 self.embeddings,将输入参数 top_k 分配给 self.top_kinit_knowledge_vector_store():此方法负责初始化知识向量库。它首先检查输入的文件路径,对于路径中的每个文件,将文件内容加载到 Document 对象中,然后将这些文档转换为嵌入向量,并将它们存储在向量库中one_knowledge_add():此方法用于向知识库中添加一个新的知识文档。它将输入的标题和内容创建为一个 Document 对象,然后将其转换为嵌入向量,并添加到向量库中get_knowledge_based_answer():此方法是基于给定的知识库和用户输入的问题,来生成一个答案。它首先根据用户输入的问题找到知识库中最相关的文档,然后生成一个包含相关文档和用户问题的提示,将提示传递给 llm_model 来生成答案
    且注意一点,这个函数调用了上面已经实现好的:similarity_search_with_scoreget_knowledge_based_conent_test():此方法是为了测试的,它将返回与输入查询最相关的文档和查询提示
        # query      查询内容
         # vs_path    知识库路径
         # chunk_conent   是否启用上下文关联
         # score_threshold    搜索匹配score阈值
         # vector_search_top_k   搜索知识库内容条数,默认搜索5条结果
         # chunk_sizes    匹配单段内容的连接上下文长度
         def get_knowledge_based_conent_test(self, query, vs_path, chunk_conent,
                                             score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD,
                                             vector_search_top_k=VECTOR_SEARCH_TOP_K, chunk_size=CHUNK_SIZE):
  • get_search_result_based_answer():此方法与 get_knowledge_based_answer() 类似,不过这里使用的是 bing_search 的结果作为知识库  
    1. def get_search_result_based_answer(self, query, chat_history=[], streaming: bool = STREAMING):
    2.     # 对查询进行 Bing 搜索,并获取搜索结果
    3.     results = bing_search(query)

    4.     # 将搜索结果转化为文本的形式
    5.     result_docs = search_result2docs(results)

    6.     # 生成用于提问的提示语
    7.     prompt = generate_prompt(result_docs, query)

    8.     # 通过 LLM(长语言模型)生成回答
    9.     for answer_result in self.llm.generatorAnswer(prompt=prompt, history=chat_history,
    10.                                                   streaming=streaming):
    11.         # 获取回答的文本
    12.         resp = answer_result.llm_output["answer"]

    13.         # 获取聊天历史
    14.         history = answer_result.history

    15.         # 将聊天历史中的最后一项的提问替换为当前的查询
    16.         history[-1][0] = query

    17.         # 组装回答的结果
    18.         response = {"query": query,
    19.                     "result": resp,
    20.                     "source_documents": result_docs}

    21.         # 返回回答的结果和聊天历史
    22.         yield response, history
    复制代码

    如你所见,这个函数和上面那个函数的主要区别在于,这个函数是直接利用搜索引擎的搜索结果来生成回答的,而上面那个函数是通过查询相似度搜索来找到最相关的文本,然后基于这些文本生成回答的
    而这个bing_search则在3.1.2节中已经定义
  • 接下来是分别用于从向量存储中删除文件、更新文件以及列出文件的三个方法
    delete_file_from_vector_store
    update_file_from_vector_store
    list_file_from_vector_store
    1.     # 删除向量存储中的文件
    2.     def delete_file_from_vector_store(self,
    3.                                       filepath: str or List[str],  # 文件路径,可以是单个文件或多个文件列表
    4.                                       vs_path):      # 向量存储路径
    5.         vector_store = load_vector_store(vs_path, self.embeddings)  # 从给定路径加载向量存储
    6.         status = vector_store.delete_doc(filepath)   # 删除指定文件
    7.         return status  # 返回删除状态

    8.     # 更新向量存储中的文件
    9.     def update_file_from_vector_store(self,
    10.                                       filepath: str or List[str],  # 需要更新的文件路径,可以是单个文件或多个文件列表
    11.                                       vs_path,  # 向量存储路径
    12.                                       docs: List[Document],):      # 需要更新的文件内容,文件以文档形式给出
    13.         vector_store = load_vector_store(vs_path, self.embeddings)  # 从给定路径加载向量存储
    14.         status = vector_store.update_doc(filepath, docs)  # 更新指定文件
    15.         return status  # 返回更新状态

    16.     # 列出向量存储中的文件
    17.     def list_file_from_vector_store(self,
    18.                                     vs_path,  # 向量存储路径
    19.                                     fullpath=False):  # 是否返回完整路径,如果为 False,则只返回文件名
    20.         vector_store = load_vector_store(vs_path, self.embeddings)  # 从给定路径加载向量存储
    21.         docs = vector_store.list_docs()      # 列出所有文件
    22.         if fullpath:  # 如果需要完整路径
    23.             return docs  # 返回完整路径列表
    24.         else:  # 如果只需要文件名
    25.             return [os.path.split(doc)[-1] for doc in docs]  # 用 os.path.split 将路径和文件名分离,只返回文件名列表
    复制代码

__main__部分的代码是 LocalDocQA 类的实例化和使用示例

    它首先初始化了一个 llm_model_ins 对象然后创建了一个 LocalDocQA 的实例并调用其 init_cfg() 方法进行初始化之后,它指定了一个查询和知识库的路径然后调用 get_knowledge_based_answer() 或 get_search_result_based_answer() 方法获取基于该查询的答案,并打印出答案和来源文档的信息
3.7.3 chains/text_load.py

chain这个文件夹下 还有最后一个项目文件(langchain-ChatGLM/text_load.py at master · imClumsyPanda/langchain-ChatGLM · GitHub),如下所示
  1. import os
  2. import pinecone
  3. from tqdm import tqdm
  4. from langchain.llms import OpenAI
  5. from langchain.text_splitter import SpacyTextSplitter
  6. from langchain.document_loaders import TextLoader
  7. from langchain.document_loaders import DirectoryLoader
  8. from langchain.indexes import VectorstoreIndexCreator
  9. from langchain.embeddings.openai import OpenAIEmbeddings
  10. from langchain.vectorstores import Pinecone

  11. #一些配置文件
  12. openai_key="你的key" # 注册 openai.com 后获得
  13. pinecone_key="你的key" # 注册 app.pinecone.io 后获得
  14. pinecone_index="你的库" #app.pinecone.io 获得
  15. pinecone_environment="你的Environment"  # 登录pinecone后,在indexes页面 查看Environment
  16. pinecone_namespace="你的Namespace" #如果不存在自动创建

  17. #科学上网你懂得
  18. os.environ['HTTP_PROXY'] = 'http://127.0.0.1:7890'
  19. os.environ['HTTPS_PROXY'] = 'http://127.0.0.1:7890'

  20. #初始化pinecone
  21. pinecone.init(
  22.     api_key=pinecone_key,
  23.     environment=pinecone_environment
  24. )
  25. index = pinecone.Index(pinecone_index)

  26. #初始化OpenAI的embeddings
  27. embeddings = OpenAIEmbeddings(openai_api_key=openai_key)

  28. #初始化text_splitter
  29. text_splitter = SpacyTextSplitter(pipeline='zh_core_web_sm',chunk_size=1000,chunk_overlap=200)

  30. # 读取目录下所有后缀是txt的文件
  31. loader = DirectoryLoader('../docs', glob="**/*.txt", loader_cls=TextLoader)

  32. #读取文本文件
  33. documents = loader.load()

  34. # 使用text_splitter对文档进行分割
  35. split_text = text_splitter.split_documents(documents)
  36. try:
  37.         for document in tqdm(split_text):
  38.                 # 获取向量并储存到pinecone
  39.                 Pinecone.from_documents([document], embeddings, index_name=pinecone_index)
  40. except Exception as e:
  41.     print(f"Error: {e}")
  42.     quit()
复制代码

3.8 vectorstores:MyFAISS.py

两个文件,一个__init__.py (就一行代码:from .MyFAISS import MyFAISS),另一个MyFAISS.py,如下代码所示
  1. # 从langchain.vectorstores库导入FAISS
  2. from langchain.vectorstores import FAISS
  3. # 从langchain.vectorstores.base库导入VectorStore            
  4. from langchain.vectorstores.base import VectorStore
  5. # 从langchain.vectorstores.faiss库导入dependable_faiss_import
  6. from langchain.vectorstores.faiss import dependable_faiss_import  

  7. from typing import Any, Callable, List, Dict  # 导入类型检查库
  8. from langchain.docstore.base import Docstore  # 从langchain.docstore.base库导入Docstore

  9. # 从langchain.docstore.document库导入Document
  10. from langchain.docstore.document import Document  

  11. import numpy as np      # 导入numpy库,用于科学计算
  12. import copy             # 导入copy库,用于数据复制
  13. import os               # 导入os库,用于操作系统相关的操作
  14. from configs.model_config import *  # 从configs.model_config库导入所有内容


  15. # 定义MyFAISS类,继承自FAISS和VectorStore两个父类
  16. class MyFAISS(FAISS, VectorStore):
复制代码

接下来,逐一实现以下函数
3.8.1 定义类的初始化函数:__init__

  1.     # 定义类的初始化函数
  2.     def __init__(
  3.             self,
  4.             embedding_function: Callable,
  5.             index: Any,
  6.             docstore: Docstore,
  7.             index_to_docstore_id: Dict[int, str],
  8.             normalize_L2: bool = False,
  9.     ):
  10.         # 调用父类FAISS的初始化函数
  11.         super().__init__(embedding_function=embedding_function,
  12.                          index=index,
  13.                          docstore=docstore,
  14.                          index_to_docstore_id=index_to_docstore_id,
  15.                          normalize_L2=normalize_L2)
  16.         # 初始化分数阈值
  17.         self.score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD
  18.         # 初始化块大小
  19.         self.chunk_size = CHUNK_SIZE
  20.         # 初始化块内容
  21.         self.chunk_conent = False
复制代码

3.8.2 seperate_list:将一个列表分解成多个子列表

  1.     # 定义函数seperate_list,将一个列表分解成多个子列表,每个子列表中的元素在原列表中是连续的
  2.     def seperate_list(self, ls: List[int]) -> List[List[int]]:
  3.         # TODO: 增加是否属于同一文档的判断
  4.         lists = []
  5.         ls1 = [ls[0]]
  6.         for i in range(1, len(ls)):
  7.             if ls[i - 1] + 1 == ls[i]:
  8.                 ls1.append(ls[i])
  9.             else:
  10.                 lists.append(ls1)
  11.                 ls1 = [ls[i]]
  12.         lists.append(ls1)
  13.         return lists
复制代码

3.8.3 similarity_search_with_score_by_vector,根据输入的向量,查找最接近的k个文本

similarity_search_with_score_by_vector 函数用于通过向量进行相似度搜索,返回与给定嵌入向量最相似的文本和对应的分数
不过,这个函数考虑的细节比较多,所以代码长度比较长,为方便大家更好的理解,我把这个函数拆分成5段逐一解释说明
3.8.4 delete_doc方法:删除文本库中指定来源的文本

  1.     #定义了一个名为 delete_doc 的方法,这个方法用于删除文本库中指定来源的文本
  2.     def delete_doc(self, source: str or List[str]):
  3.         # 使用 try-except 结构捕获可能出现的异常
  4.         try:
  5.             # 如果 source 是字符串类型
  6.             if isinstance(source, str):
  7.                 # 找出文本库中所有来源等于 source 的文本的id
  8.                 ids = [k for k, v in self.docstore._dict.items() if v.metadata["source"] == source]

  9.                 # 获取向量存储的路径
  10.                 vs_path = os.path.join(os.path.split(os.path.split(source)[0])[0], "vector_store")

  11.             # 如果 source 是列表类型
  12.             else:
  13.                 # 找出文本库中所有来源在 source 列表中的文本的id
  14.                 ids = [k for k, v in self.docstore._dict.items() if v.metadata["source"] in source]

  15.                 # 获取向量存储的路径
  16.                 vs_path = os.path.join(os.path.split(os.path.split(source[0])[0])[0], "vector_store")

  17.             # 如果没有找到要删除的文本,返回失败信息
  18.             if len(ids) == 0:
  19.                 return f"docs delete fail"

  20.             # 如果找到了要删除的文本
  21.             else:
  22.                 # 遍历所有要删除的文本id
  23.                 for id in ids:
  24.                     # 获取该id在索引中的位置
  25.                     index = list(self.index_to_docstore_id.keys())[list(self.index_to_docstore_id.values()).index(id)]

  26.                     # 从索引中删除该id
  27.                     self.index_to_docstore_id.pop(index)

  28.                     # 从文本库中删除该id对应的文本
  29.                     self.docstore._dict.pop(id)

  30.                 # TODO: 从 self.index 中删除对应id,这是一个未完成的任务
  31.                 # self.index.reset()
  32.                 # 保存当前状态到本地
  33.                 self.save_local(vs_path)

  34.                 # 返回删除成功的信息
  35.                 return f"docs delete success"

  36.         # 捕获异常
  37.         except Exception as e:
  38.             # 打印异常信息
  39.             print(e)
  40.             # 返回删除失败的信息
  41.             return f"docs delete fail"
复制代码

3.8.5 update_doc和lists_doc

  1.    # 定义了一个名为 update_doc 的方法,这个方法用于更新文档库中的文档
  2.     def update_doc(self, source, new_docs):
  3.         # 使用 try-except 结构捕获可能出现的异常
  4.         try:
  5.             # 删除旧的文档
  6.             delete_len = self.delete_doc(source)

  7.             # 添加新的文档
  8.             ls = self.add_documents(new_docs)

  9.             # 返回更新成功的信息
  10.             return f"docs update success"
  11.         # 捕获异常
  12.         except Exception as e:
  13.             # 打印异常信息
  14.             print(e)

  15.             # 返回更新失败的信息
  16.             return f"docs update fail"

  17.     # 定义了一个名为 list_docs 的方法,这个方法用于列出文档库中所有文档的来源
  18.     def list_docs(self):
  19.         # 遍历文档库中的所有文档,取出每个文档的来源,转换为集合,再转换为列表,最后返回这个列表
  20.         return list(set(v.metadata["source"] for v in self.docstore._dict.values()))
复制代码


第四部分 23年9月升级版Langchain-Chatchat的源码解析

23年9月,原项目LangChain + ChatGLM-6B做了升级,变成如今的Langchain-Chatchat项目


​其主要更新体现在增加了一个sever的文件夹,该文件夹包括


    chat,包含
    __init__.py
    chat.py
    knowledge_base_chat.py
    openai_chat.py
    search_engine_chat.py
    utils.pydb,包含
    models
        __init__.py
         base.py
         knowledge_base_model.py (即KnowledgeBaseModel的实现,下文4.2.1节阐述)
    knowledge_file_model.py (即KnowledgeFile的实现,下文4.2.2节阐述)

    repository
        __init__.py
         knowledge_base_repository.py (涉及add_kb_to_db的实现,下文4.3.1节阐述)
         knowledge_file_repository.py (涉及add_flie_to_db/add_docs_to_db的实现,下文4.3.2节阐述)
    knowledge_base,包含
    kb_cache
        base.py (涉及KBServiceFactory的实现,下文4.1.2节阐述)
         faiss_cache.py


    kb_service
        __init__.py
    base.py
         default_kb_service.py
         faiss_kb_service.py
         milvus_kb_service.py
         pg_kb_service.py


    __init__.py
    kb_api.py
    kb_doc_api.py (这个实现了多文档问答的最核心逻辑,下文4.1.1节阐述)
    migrate.py
    utils.pymodel_workersstatic
等分文件夹
4.1 server/knowledge_base:基于批量文档的企业知识库问答

该项目的最新版中实现了基于批量文档的问答,比如
  1. # 开始遍历自定义的文档集合(docs)
  2. for file_name, v in docs.items():
  3.     try:
  4.         # 对于v中的每个条目,检查它是否已经是Document类型
  5.         # 如果不是,那么将其转换为Document对象
  6.         v = [x if isinstance(x, Document) else Document(**x) for x in v]
  7.         
  8.         # 根据文件名和知识库名称创建KnowledgeFile对象
  9.         kb_file = KnowledgeFile(filename=file_name, knowledge_base_name=knowledge_base_name)
  10.         
  11.         # 在知识库中更新该文件的文档
  12.         kb.update_doc(kb_file, docs=v, not_refresh_vs_cache=True)
  13.         
  14.         # ...
复制代码

4.1.1 knowledge_base /kb_doc_api.py

以下是对该项目文件的逐行分析:Langchain-Chatchat/server/knowledge_base /kb_doc_api.py
总体来说,这段代码主要为知识库文档提供了CRUD操作(创建、读取、更新、删除)及相关的向量化处理
4.1.2 KBServiceFactory的实现:knowledge_base/kb_service/base.py


    导入模块

      基本的Python库如os, operator。用于数据操作和向量化的库如numpy, sklearn。项目内部的模块如langchain.embeddings.base, langchain.docstore.document等。
    SupportedVSType 类

      这是一个简单的类,用于定义支持的向量存储类型。例如:FAISS, MILVUS等。
    KBService 类

      这是一个抽象基类,定义了知识库服务的基本功能和行为。初始化函数:给定一个知识库名和嵌入模型名称,进行初始化。
    • 提供了一系列方法,如
      create_kb (创建知识库)   
      1.     # 创建知识库方法
      2.     def create_kb(self):
      3.         # 检查doc_path路径是否存在
      4.         if not os.path.exists(self.doc_path):
      5.             # 如果不存在,创建该目录
      6.             os.makedirs(self.doc_path)
      7.         # 调用子类中定义的do_create_kb方法来执行具体的知识库创建过程
      8.         self.do_create_kb()
      9.         # 将新的知识库添加到数据库中,并返回操作状态
      10.         status = add_kb_to_db(self.kb_name, self.vs_type(), self.embed_model)
      11.         # 返回操作状态
      12.         return status
      复制代码

      其中的add_kb_to_db将在下文的“4.3.1 knowledge_base_repository.py:实现add_kb_to_db”中分析

      add_doc (向知识库添加文件)   
      1.     # 向知识库添加文件方法
      2.     def add_doc(self, kb_file: KnowledgeFile, docs: List[Document] = [], **kwargs):
      3.         # 判断docs列表是否有内容
      4.         if docs:
      5.             # 设置一个标志,表示这是自定义文档列表
      6.             custom_docs = True

      7.             # 遍历传入的文档列表
      8.             for doc in docs:
      9.                 # 为每个文档的metadata设置默认的"source"属性,值为文件的路径
      10.                 doc.metadata.setdefault("source", kb_file.filepath)
      11.         else:
      12.             # 如果没有提供docs,从kb_file中读取文档内容
      13.             docs = kb_file.file2text()
      14.             # 设置一个标志,表示这不是自定义文档列表
      15.             custom_docs = False

      16.         # 如果docs列表有内容
      17.         if docs:
      18.             # 删除与kb_file相关联的现有文档
      19.             self.delete_doc(kb_file)

      20.             # 调用子类中定义的do_add_doc方法来执行具体的文档添加过程,并返回文档信息
      21.             doc_infos = self.do_add_doc(docs, **kwargs)

      22.             # 将新的文档信息添加到数据库中,并返回操作状态
      23.             status = add_file_to_db(kb_file,
      24.                                     custom_docs=custom_docs,
      25.                                     docs_count=len(docs),
      26.                                     doc_infos=doc_infos)
      27.         else:
      28.             # 如果docs列表为空,则设置操作状态为False
      29.             status = False
      30.         # 返回操作状态
      31.         return status
      复制代码

      其中add_file_to_db将在下文的“4.3.2 knowledge_file_repository.py:实现add_file_to_db/add_docs_to_db”中分析还有一些抽象方法,如do_create_kb、do_search等,子类必须实现这些方法
    KBServiceFactory 类

      这是一个工厂类,根据提供的向量存储类型返回相应的知识库服务实例
    • 使用静态方法获取不同的知识库服务实例   
      1. # 知识库服务工厂类
      2. class KBServiceFactory:

      3.     # 根据向量存储类型返回相应的知识库服务实例
      4.     @staticmethod
      5.     def get_instance(vs_type: str, knowledge_base_name: str) -> KBService:
      6.         if vs_type == SupportedVSType.FAISS:
      7.             from server.knowledge_base.kb_faiss import KBServiceFaiss
      8.             return KBServiceFaiss(knowledge_base_name)
      9.         elif vs_type == SupportedVSType.MILVUS:
      10.             from server.knowledge_base.kb_milvus import KBServiceMilvus
      11.             return KBServiceMilvus(knowledge_base_name)
      12.         else:
      13.             raise ValueError(f"Unsupported VS type: {vs_type}")
      复制代码

    get_kb_details 函数

      获取目录和数据库中的所有知识库的详细信息,并将这些信息合并
    get_kb_file_details 函数

      为指定的知识库获取目录和数据库中的所有文件的详细信息,并将这些信息合并。
    EmbeddingsFunAdapter 类

      这是一个适配器类,用于在Embeddings类上添加额外的功能。embed_documents 和 embed_query 方法对输入的文本进行嵌入,并将结果进行标准化。aembed_documents 和 aembed_query 是它们的异步版本。
    score_threshold_process 函数

      这是一个简单的函数,用于在得分低于某个阈值的情况下筛选文档,并返回前k个文档

4.2 server/db/models文件夹的更新

4.2.1 KnowledgeBaseModel的实现

server/db/models/knowledge_base_model.py中实现了
  1. from sqlalchemy import Column, Integer, String, DateTime, func

  2. from server.db.base import Base


  3. class KnowledgeBaseModel(Base):
  4.     """
  5.     知识库模型
  6.     """
  7.     __tablename__ = 'knowledge_base'
  8.     id = Column(Integer, primary_key=True, autoincrement=True, comment='知识库ID')
  9.     kb_name = Column(String(50), comment='知识库名称')
  10.     vs_type = Column(String(50), comment='向量库类型')
  11.     embed_model = Column(String(50), comment='嵌入模型名称')
  12.     file_count = Column(Integer, default=0, comment='文件数量')
  13.     create_time = Column(DateTime, default=func.now(), comment='创建时间')

  14.     def __repr__(self):
  15.         return f"<KnowledgeBase(id='{self.id}', kb_name='{self.kb_name}', vs_type='{self.vs_type}', embed_model='{self.embed_model}', file_count='{self.file_count}', create_time='{self.create_time}')>"
复制代码

4.2.2 KnowledgeFile的实现

经过仔细查找发现,在server/db/models /knowledge_file_model.py项目文件中实现了KnowledgeFile
  1. # 导入sqlalchemy所需的模块和函数
  2. from sqlalchemy import Column, Integer, String, DateTime, Float, Boolean, JSON, func

  3. # 从server.db.base导入Base类,这通常用于ORM的基础模型
  4. from server.db.base import Base

  5. # 定义KnowledgeFileModel类,用于映射“知识文件”数据模型
  6. class KnowledgeFileModel(Base):
  7.     """
  8.     知识文件模型
  9.     """
  10.     __tablename__ = 'knowledge_file'
  11.     id = Column(Integer, primary_key=True, autoincrement=True, comment='知识文件ID')

  12.     file_name = Column(String(255), comment='文件名')
  13.     file_ext = Column(String(10), comment='文件扩展名')
  14.     kb_name = Column(String(50), comment='所属知识库名称')
  15.     document_loader_name = Column(String(50), comment='文档加载器名称')

  16.     text_splitter_name = Column(String(50), comment='文本分割器名称')
  17.     file_version = Column(Integer, default=1, comment='文件版本')
  18.     file_mtime = Column(Float, default=0.0, comment="文件修改时间")
  19.     file_size = Column(Integer, default=0, comment="文件大小")
  20.     custom_docs = Column(Boolean, default=False, comment="是否自定义docs")
  21.     docs_count = Column(Integer, default=0, comment="切分文档数量")
  22.     create_time = Column(DateTime, default=func.now(), comment='创建时间')

  23.     # 定义对象的字符串表示形式
  24.     def __repr__(self):
  25.         return f"<KnowledgeFile(id='{self.id}', file_name='{self.file_name}',
  26. file_ext='{self.file_ext}', kb_name='{self.kb_name}',
  27. document_loader_name='{self.document_loader_name}',
  28. text_splitter_name='{self.text_splitter_name}',
  29. file_version='{self.file_version}', create_time='{self.create_time}')>"

  30. # 定义FileDocModel类,用于映射“文件-向量库文档”数据模型
  31. class FileDocModel(Base):
  32.     """
  33.     文件-向量库文档模型
  34.     """
  35.     # 定义表名为'file_doc'
  36.     __tablename__ = 'file_doc'

  37.     # 定义id字段为主键,并设置自动递增,并且附加注释
  38.     id = Column(Integer, primary_key=True, autoincrement=True, comment='ID')
  39.     # 定义知识库名称字段,并附加注释
  40.     kb_name = Column(String(50), comment='知识库名称')

  41.     # 定义文件名称字段,并附加注释
  42.     file_name = Column(String(255), comment='文件名称')

  43.     # 定义向量库文档ID字段,并附加注释
  44.     doc_id = Column(String(50), comment="向量库文档ID")
  45.     # 定义元数据字段,默认为一个空字典
  46.     meta_data = Column(JSON, default={})

  47.     # 定义对象的字符串表示形式
  48.     def __repr__(self):
  49.         return f"<FileDoc(id='{self.id}', kb_name='{self.kb_name}', file_name='{self.file_name}', doc_id='{self.doc_id}', metadata='{self.metadata}')>"
复制代码

4.3 server/db/repository

4.3.1 knowledge_base_repository.py:实现add_kb_to_db

  1. def add_kb_to_db(session, kb_name, vs_type, embed_model):
  2.     # 查询指定名称的知识库是否存在于数据库中
  3.     kb = session.query(KnowledgeBaseModel).filter_by(kb_name=kb_name).first()
  4.    
  5.     # 如果指定的知识库不存在,则创建一个新的知识库实例
  6.     if not kb:
  7.         kb = KnowledgeBaseModel(kb_name=kb_name, vs_type=vs_type, embed_model=embed_model)
  8.         # 将新的知识库实例添加到session,这样可以在之后提交到数据库
  9.         session.add(kb)
  10.     else:  # 如果知识库已经存在,则更新它的vs_type和embed_model
  11.         kb.vs_type = vs_type
  12.         kb.embed_model = embed_model
  13.    
  14.     # 返回True,表示操作成功完成
  15.     return True
复制代码

至于其中的KnowledgeBaseModel方法,已经在上文的“4.2.1 KnowledgeBaseModel的实现”中分析了
4.3.2 knowledge_file_repository.py:实现add_file_to_db/add_docs_to_db


    add_file_to_db,将添加文件到数据库,最后会调用add_docs_to_dbadd_docs_to_db,将文档添加到数据库
  1. # 定义向数据库添加文件的函数
  2. def add_file_to_db(session,  # 数据库会话对象
  3.                 kb_file: KnowledgeFile,       # 知识文件对象
  4.                 docs_count: int = 0,           # 文档数量,默认为0
  5.                 custom_docs: bool = False,     # 是否为自定义文档,默认为False
  6.                 doc_infos: List[str] = [],     # 文档信息列表,默认为空。形式为:[{"id": str, "metadata": dict}, ...]
  7.                 ):
  8.     # 从数据库中查询与知识库名相匹配的知识库记录
  9.     kb = session.query(KnowledgeBaseModel).filter_by(kb_name=kb_file.kb_name).first()

  10.     # 如果该知识库存在
  11.     if kb:
  12.         # 查询与文件名和知识库名相匹配的文件记录
  13.         existing_file: KnowledgeFileModel = (session.query(KnowledgeFileModel)
  14.                                              .filter_by(file_name=kb_file.filename,
  15.                                                         kb_name=kb_file.kb_name)
  16.                                             .first())
  17.         # 获取文件的修改时间
  18.         mtime = kb_file.get_mtime()
  19.         # 获取文件的大小
  20.         size = kb_file.get_size()

  21.         # 如果该文件已存在
  22.         if existing_file:
  23.             # 更新文件的修改时间
  24.             existing_file.file_mtime = mtime
  25.             # 更新文件的大小
  26.             existing_file.file_size = size
  27.             # 更新文档数量
  28.             existing_file.docs_count = docs_count
  29.             # 更新自定义文档标志
  30.             existing_file.custom_docs = custom_docs
  31.             # 文件版本号自增
  32.             existing_file.file_version += 1
  33.         # 如果文件不存在
  34.         else:
  35.             # 创建一个新的文件记录对象
  36.             new_file = KnowledgeFileModel(
  37.                 file_name=kb_file.filename,
  38.                 file_ext=kb_file.ext,
  39.                 kb_name=kb_file.kb_name,
  40.                 document_loader_name=kb_file.document_loader_name,
  41.                 text_splitter_name=kb_file.text_splitter_name or "SpacyTextSplitter",
  42.                 file_mtime=mtime,
  43.                 file_size=size,
  44.                 docs_count = docs_count,
  45.                 custom_docs=custom_docs,
  46.             )
  47.             # 知识库的文件计数增加
  48.             kb.file_count += 1
  49.             # 将新文件添加到数据库会话中
  50.             session.add(new_file)

  51.         # 添加文档到数据库
  52.         add_docs_to_db(kb_name=kb_file.kb_name, file_name=kb_file.filename, doc_infos=doc_infos)

  53.     # 返回True表示操作成功
  54.     return True
复制代码

通过查看上面的倒数第二行代码可知,add_file_to_db最后调用add_docs_to_db以实现添加文档到数据库
  1. def add_docs_to_db(session,
  2.                    kb_name: str,
  3.                    file_name: str,
  4.                    doc_infos: List[Dict]):
  5.     '''
  6.     将某知识库某文件对应的所有Document信息添加到数据库
  7.     doc_infos形式:[{"id": str, "metadata": dict}, ...]
  8.     '''
  9.     for d in doc_infos:
  10.         obj = FileDocModel(
  11.             kb_name=kb_name,
  12.             file_name=file_name,
  13.             doc_id=d["id"],
  14.             meta_data=d["metadata"],
  15.         )
  16.         session.add(obj)
  17.     return True
复制代码

更多暂先课上见:七月LLM与langchain/知识图谱/数据库的实战 [解决问题、实用为王],再之后继续更新本文
第五部分 langchain-chatchat的二次开发:商用时的典型问题及其改进方案

上述这个langchain-chatchat开源项目虽好,但真正落地商用时,会遇到各种工程问题,包括且不限于

    如何解决检索出错:embedding算法是关键如何解决检索对了但不根据知识库回答而是根据模型自有的预训练知识回答如何针对结构化文档采取更好的chunk分割:基于规则如何解决非结构化文档分割不够准确的问题:比如最好按照语义切分如何确保召回结果的全面性与准确性:多路召回与最后的去重/精排如何解决基于文档中表格的问答..
以上内容,请详见《知识库问答Langchain-Chatchat的二次开发:商用时的典型问题及其改进方案

参考文献与推荐阅读

后记

本文经历了三个阶段

    对langchain的梳理
    langchain的组件很多,想理解透彻的话,需要一步步来
    包括我自己刚开始看这个库的时候 真心是晕,无从下手,后来10天过后,可以直接一个文件一个文件的点开 直接看..
    总之,凡事都是一个过程对langchain-ChatGLM项目源码的解读
    说实话,一开始也是挺晕的,因为各种项目文件又很多,好在后来历时一周总算梳理清楚了开写新的:第四部分 23年9月升级版Langchain-Chatchat的源码解析
创作、修改、优化记录


    7.5-7.9日,每天写一一部分7.10,完善第一部分关于什么是langchain的介绍7.11,根据langchain-ChatGLM项目的最新更新,整理已写内容7.12 写完前3.8节,且根据项目流程调整各个文件夹的解读顺序
    相当于历时近一周,总算把 “langchain-ChatGLM的整体代码架构” 梳理清楚了7.15,补充langchain架构相关的内容,且为方便理解,把整个langchain库划分为三个大层:基础层、能力层、应用层7.17,开始写第四部分,重点是4.2节:用知识图谱增强 LLM的预训练、推理、可解释性7.26,续写第四部分,开始更新第五部分:LLM与数据库的结合9.15,把原来的第四部分、第五部分抽取出来独立成一篇新的文章:知识图谱通俗导论:从什么是KG到LLM与KG/DB的结合实战9.16,开写新的:第四部分 23年9月升级版Langchain-Chatchat的源码解析10.26,补充针对“faiss库的search函数”的细致说明及代码实现,这点对理解查找K个相似的向量等相关流程很重要11.8,因得知好几个大厂的AI部门在密切关注本文的更新进度,故更新了二次开发时遇到的几个问题作为示例12.27,补充为何要进行切割的原因,以及补充一个 embedding 的 benchmark



版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/v_JULY_v/article/details/131552592


回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关注0

粉丝0

帖子56

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )