作者:dylan55_you
1. 引言
智能体(Agent)是一个使用大语言模型(LLM)来决定应用程序控制流程的系统。随着这些系统的开发,它们可能会变得越来越复杂,难以管理和扩展。例如,你可能会遇到以下问题:
智能体可用的工具过多,难以决定下一步调用哪个工具。对于单个智能体来说,上下文变得过于复杂,难以跟踪。系统中需要多个专业领域(例如,规划师、研究员、数学专家等)。
为了解决这些问题,你可能会考虑将应用程序分解为多个更小、独立的智能体,并将它们组合成一个多智能体系统。这些独立的智能体可以简单到一个提示词(prompt)加一次 LLM 调用,也可以复杂到一个 ReAct 智能体(甚至更复杂!)。
随着智能体框架的发展,许多公司开始构建自己的多智能体系统,并寻找能够解决所有智能体任务的“银弹”方案。两年前,研究人员设计了一个名为 ChatDev 的多智能体协作系统。ChatDev 是一个虚拟软件公司,通过拥有不同角色(如首席执行官、首席产品官、艺术设计师、程序员、评审员、测试员等,就像一个常规的软件工程公司一样)的各种智能体来运作。
所有这些智能体协同工作,相互交流,最终成功创建了一款视频游戏。在这一成就之后,许多人认为任何软件工程任务都可以使用这种多智能体架构来解决,即每个 AI 都有明确的角色分工。然而,现实世界的实验表明,并非所有问题都能用相同的架构解决。在某些情况下,更简单的架构可能提供更有效、更具成本效益的解决方案。
1.1 单智能体 vs. 多智能体架构
起初,单智能体方法(即一个 AI 智能体可以处理所有事情,从浏览器导航到文件操作)可能是合理的。然而,随着时间的推移,随着任务变得更加复杂和工具数量的增长,我们的单智能体方法将开始面临挑战。
当智能体开始出现异常行为时,我们会注意到以下影响,其原因可能包括:
工具过多:智能体对使用哪些工具和/或何时使用感到困惑。上下文过多:智能体越来越大的上下文窗口包含了过多的工具信息。错误过多:由于职责过于宽泛,智能体开始产生次优或不正确的结果。
当我们开始自动化多个不同的子任务(如数据提取或报告生成)时,可能就到了分离职责的时候了。通过使用多个 AI 智能体,每个智能体专注于自己的领域和工具集,我们可以提高解决方案的清晰度和质量。这不仅使智能体变得更有效,而且也简化了智能体本身的开发。
2. 多智能体架构
正如你所见,单智能体和多智能体架构都各有优缺点。当任务直接且定义明确,并且没有特定的资源限制时,单智能体架构是理想的选择。另一方面,当用例复杂且动态、需要更专业的知识和协作、或有可扩展性和适应性要求时,多智能体架构会很有帮助。
2.1 多智能体系统中的模式
在多智能体系统中,有多种连接智能体的方式:
2.1.1 并行 (Parallel)
多个智能体同时处理任务的不同部分。
示例:我们希望使用 3 个智能体同时对给定文本进行摘要、翻译和情绪分析。
- from typing import Dict, Any, TypedDict
- from langgraph.graph import StateGraph, END
- from langchain_core.runnables import RunnableConfig
- from textblob import TextBlob
- import re
- import time
- # Define the state
- class AgentState(TypedDict):
- text: str
- summary: str
- translation: str
- sentiment: str
- summary_time: float
- translation_time: float
- sentiment_time: float
- # Summarization Agent
- def summarize_agent(state: AgentState) -> Dict[str, Any]:
- print("Summarization Agent: Running")
- start_time = time.time()
-
- try:
- text = state["text"]
- if not text.strip():
- return {
- "summary": "No text provided for summarization.",
- "summary_time": 0.0
- }
-
- time.sleep(2)
- sentences = re.split(r'(?<=[.!?]) +', text.strip())
- scored_sentences = [(s, len(s.split())) for s in sentences if s]
- top_sentences = [s for s, _ in sorted(scored_sentences, key=lambda x: x[1], reverse=True)[:2]]
- summary = " ".join(top_sentences) if top_sentences else "Text too short to summarize."
-
- processing_time = time.time() - start_time
- print(f"Summarization Agent: Completed in {processing_time:.2f} seconds")
-
- return {
- "summary": summary,
- "summary_time": processing_time
- }
- except Exception as e:
- return {
- "summary": f"Error in summarization: {str(e)}",
- "summary_time": 0.0
- }
- # Translation Agent
- def translate_agent(state: AgentState) -> Dict[str, Any]:
- print("Translation Agent: Running")
- start_time = time.time()
-
- try:
- text = state["text"]
- if not text.strip():
- return {
- "translation": "No text provided for translation.",
- "translation_time": 0.0
- }
-
- time.sleep(3)
- translation = (
- "El nuevo parque en la ciudad es una maravillosa adición. "
- "Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. "
- "Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña."
- )
-
- processing_time = time.time() - start_time
- print(f"Translation Agent: Completed in {processing_time:.2f} seconds")
-
- return {
- "translation": translation,
- "translation_time": processing_time
- }
- except Exception as e:
- return {
- "translation": f"Error in translation: {str(e)}",
- "translation_time": 0.0
- }
- # Sentiment Agent
- def sentiment_agent(state: AgentState) -> Dict[str, Any]:
- print("Sentiment Agent: Running")
- start_time = time.time()
-
- try:
- text = state["text"]
- if not text.strip():
- return {
- "sentiment": "No text provided for sentiment analysis.",
- "sentiment_time": 0.0
- }
-
- time.sleep(1.5)
- blob = TextBlob(text)
- polarity = blob.sentiment.polarity
- subjectivity = blob.sentiment.subjectivity
-
- sentiment = "Positive" if polarity > 0 else "Negative" if polarity < 0 else "Neutral"
- result = f"{sentiment} (Polarity: {polarity:.2f}, Subjectivity: {subjectivity:.2f})"
-
- processing_time = time.time() - start_time
- print(f"Sentiment Agent: Completed in {processing_time:.2f} seconds")
-
- return {
- "sentiment": result,
- "sentiment_time": processing_time
- }
- except Exception as e:
- return {
- "sentiment": f"Error in sentiment analysis: {str(e)}",
- "sentiment_time": 0.0
- }
- # Join Node
- def join_parallel_results(state: AgentState) -> AgentState:
- return state
- # Build the Graph
- def build_parallel_graph() -> StateGraph:
- workflow = StateGraph(AgentState)
-
- # Define parallel branches
- parallel_branches = {
- "summarize_node": summarize_agent,
- "translate_node": translate_agent,
- "sentiment_node": sentiment_agent
- }
-
- # Add parallel processing nodes
- for name, agent in parallel_branches.items():
- workflow.add_node(name, agent)
-
- # Add branching and joining nodes
- workflow.add_node("branch", lambda state: state) # Simplified branch function
- workflow.add_node("join", join_parallel_results)
-
- # Set entry point
- workflow.set_entry_point("branch")
-
- # Add edges for parallel execution
- for name in parallel_branches:
- workflow.add_edge("branch", name)
- workflow.add_edge(name, "join")
-
- workflow.add_edge("join", END)
-
- return workflow.compile()
- # Main function
- def main():
- text = (
- "The new park in the city is a wonderful addition. Families are enjoying the open spaces, "
- "and children love the playground. However, some people think the parking area is too small."
- )
-
- initial_state: AgentState = {
- "text": text,
- "summary": "",
- "translation": "",
- "sentiment": "",
- "summary_time": 0.0,
- "translation_time": 0.0,
- "sentiment_time": 0.0
- }
-
- print("\\nBuilding new graph...")
- app = build_parallel_graph()
-
- print("\\nStarting parallel processing...")
- start_time = time.time()
-
- config = RunnableConfig(parallel=True)
- result = app.invoke(initial_state, config=config)
-
- total_time = time.time() - start_time
-
- print("\\n=== Parallel Task Results ===")
- print(f"Input Text:\\n{text}\\n")
- print(f"Summary:\\n{result['summary']}\\n")
- print(f"Translation (Spanish):\\n{result['translation']}\\n")
- print(f"Sentiment Analysis:\\n{result['sentiment']}\\n")
-
- print("\\n=== Processing Times ===")
- processing_times = {
- "summary": result["summary_time"],
- "translation": result["translation_time"],
- "sentiment": result["sentiment_time"]
- }
- for agent, time_taken in processing_times.items():
- print(f"{agent.capitalize()}: {time_taken:.2f} seconds")
-
- print(f"\\nTotal Wall Clock Time: {total_time:.2f} seconds")
- print(f"Sum of Individual Processing Times: {sum(processing_times.values()):.2f} seconds")
- print(f"Time Saved by Parallel Processing: {sum(processing_times.values()) - total_time:.2f} seconds")
- if __name__ == "__main__":
- main()
复制代码 输出:- Building new graph...
- Starting parallel processing...
- Sentiment Agent: Running
- Summarization Agent: Running
- Translation Agent: Running
- Sentiment Agent: Completed in 1.50 seconds
- Summarization Agent: Completed in 2.00 seconds
- Translation Agent: Completed in 3.00 seconds
- === Parallel Task Results ===
- Input Text:
- The new park in the city is a wonderful addition. Families are enjoying the open spaces, and children love the playground. However, some people think the parking area is too small.
- Summary:
- Families are enjoying the open spaces, and children love the playground. The new park in the city is a wonderful addition.
- Translation (Spanish):
- El nuevo parque en la ciudad es una maravillosa adición. Las familias disfrutan de los espacios abiertos, y a los niños les encanta el parque infantil. Sin embargo, algunas personas piensan que el área de estacionamiento es demasiado pequeña.
- Sentiment Analysis:
- Positive (Polarity: 0.31, Subjectivity: 0.59)
- === Processing Times ===
- Summary: 2.00 seconds
- Translation: 3.00 seconds
- Sentiment: 1.50 seconds
- Total Wall Clock Time: 3.01 seconds
- Sum of Individual Processing Times: 6.50 seconds
- Time Saved by Parallel Processing: 3.50 seconds
复制代码并行性:三个任务(摘要、翻译、情绪分析)同时运行,减少了总处理时间。独立性:每个智能体独立地对输入文本进行操作,执行期间不需要智能体间通信。协调:(代码中的)队列确保结果安全收集并按顺序显示。现实用例:摘要、翻译和情绪分析是常见的 NLP 任务,尤其对于较长的文本,能从并行处理中受益。
2.1.2 顺序 (Sequential)
任务按顺序处理,一个智能体的输出成为下一个智能体的输入。
示例:多步审批流程。- from typing import Dict
- from langgraph.graph import StateGraph, MessagesState, END
- from langchain_core.runnables import RunnableConfig
- from langchain_core.messages import HumanMessage, AIMessage
- import json
- # Agent 1: Team Lead
- def team_lead_agent(state: MessagesState, config: RunnableConfig) -> Dict:
- print("Agent (Team Lead): Starting review")
- messages = state["messages"]
- proposal = json.loads(messages[0].content)
- title = proposal.get("title", "")
- amount = proposal.get("amount", 0.0)
- if not title or amount <= 0:
- status = "Rejected"
- comment = "Team Lead: Proposal rejected due to missing title or invalid amount."
- goto = END
- else:
- status = "Approved by Team Lead"
- comment = "Team Lead: Proposal is complete and approved."
- goto = "dept_manager"
- print(f"Agent (Team Lead): Review complete - {status}")
- messages.append(AIMessage(
- content=json.dumps({"status": status, "comment": comment}),
- additional_kwargs={"agent": "team_lead", "goto": goto}
- ))
- return {"messages": messages}
- # Agent 2: Department Manager
- def dept_manager_agent(state: MessagesState, config: RunnableConfig) -> Dict:
- print("Agent (Department Manager): Starting review")
- messages = state["messages"]
- team_lead_msg = next((m for m in messages if m.additional_kwargs.get("agent") == "team_lead"), None)
- proposal = json.loads(messages[0].content)
- amount = proposal.get("amount", 0.0)
- if json.loads(team_lead_msg.content)["status"] != "Approved by Team Lead":
- status = "Rejected"
- comment = "Department Manager: Skipped due to Team Lead rejection."
-
复制代码 原文地址:https://blog.csdn.net/charce_you/article/details/150851671 |