from enum import Enum from pydantic import BaseModel, Field from typing import Dict, List, Literal, Optional, Union import time import re import json import uuid from qwen_agent.messages.plan_message import PlanInfo class ChatResponseClientMessage(BaseModel): name: Literal["message.chunk", "message.whole", "info.agent"] = None role: Literal["user", "assistant"] = None content: str = None finish_reason: Optional[Literal["stop", "flush"]] = None extra_info: Optional[Dict] = None class ChatResponseStreamChoice(BaseModel): role: Literal["user", "assistant"] = None delta: str = None finish_reason: Optional[Literal["stop", "flush"]] = None class SystemSignal(BaseModel): signal: Literal["user_clarification"] class ChatResponseChoice(BaseModel): role: Literal["user", "assistant", "system", "function", "info"] = "info" content: Optional[str] = None content_simple: Optional[str] = None function_call: Optional[Dict] = None clarification: Optional[Dict] = None insight: Optional[Dict] = None finished: Optional[bool] = None history: Optional[bool] = None choice_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", "")) def clean_text(self): if self.content: self.content = self.content.strip() self.content = re.sub('\n+', '\n', self.content) def simplify(self): lines = self.content.split('\n') simple_list = [] for line in lines: line_low = line.lower() if line_low.startswith('final:') or line_low.startswith('thought:') \ or line_low.startswith('用户的原始question为') \ or line_low.startswith('question:'): # or line_low.startswith('通过执行[') \ # or line_low.startswith('instruction:') \ # or line_low.startswith('output:'): # or line_low.startswith('action:') or line_low.startswith('action input:') \ # or line_low.startswith('已经执行结束的action如下:') \ # or line_low.startswith('需要执行的action如下:') \ # or line_low.startswith('sql code:') continue elif line: simple_list.append(line) # print("lines:", lines) # print("simple list:", simple_list) self.content_simple = '\n'.join(simple_list) class AgentCompletionResponse(BaseModel): choices: List[ChatResponseChoice] = [] agent_name: str agent_name_cn: str llm_name: str instruction: Optional[str] = None exec_res: Optional[str] = None executed: bool = False history: Optional[bool] = None add_to_context: bool = None add_to_final: Optional[bool] = None created: Optional[int] = Field(default_factory=lambda: int(time.time())) agent_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", "")) sql_code: Optional[str] = None gis_result: Optional[object] = None def get_summary_str(self, add_sql_code): if self.agent_name == 'SpatialAnalysisAgent': self.gis_result = dict({ 'originWkt': self.exec_res['原图形'], 'result': self.exec_res['result'], 'intersectWKts': self.exec_res['相交的图形'], }) del self.exec_res['result'] del self.exec_res['相交的图形'] del self.exec_res['原图形'] if self.executed: if add_sql_code and self.sql_code: return f"通过执行[{self.agent_name}]接口\nInstruction:{self.instruction}\nSQL Code:{self.sql_code}\nOutput:{self.exec_res}" else: return f"通过执行[{self.agent_name}]接口\nInstruction:{self.instruction}\nOutput:{self.exec_res}" else: return f"Instruction: {self.instruction}" class PlanCompletionResponse(BaseModel): agent_responses: List[AgentCompletionResponse] = [] user_request: str = None history: List[str] = [] created: Optional[int] = Field(default_factory=lambda: int(time.time())) conversation_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", "")) class PlanResponseContextManager(BaseModel): plan_response: Optional[PlanCompletionResponse] = Field(default_factory=lambda: PlanCompletionResponse()) last_message_finish: bool = True last_agent_finish: bool = True executing_agent: Optional[AgentCompletionResponse] = None user_request: str = None plans: List[PlanInfo] = [] plan_msg: str = None system_signal: Optional[SystemSignal] = None clarification_data: Optional[str] = None has_search_db: bool = None has_sql_data: bool = False def init_chat(self, user_request: str, history): self.plan_response.user_request = user_request self.plan_response.history = history self.user_request = user_request def _add_message_to_agent_response(self, message: Union[ChatResponseStreamChoice, ChatResponseChoice]): if isinstance(message, ChatResponseStreamChoice): if self.last_message_finish and message.delta: self.executing_agent.choices.append(ChatResponseChoice(role=message.role, content=message.delta)) self.last_message_finish = False elif message.delta: assert message.role is None or message.role == self.executing_agent.choices[-1].role self.executing_agent.choices[-1].content += message.delta if message.finish_reason == "stop": self.last_message_finish = True self.executing_agent.choices[-1].clean_text() self.executing_agent.choices[-1].simplify() self.executing_agent.choices[-1].finished = True elif message.finish_reason == "flush" and not self.last_message_finish: self.executing_agent.choices.pop() self.last_message_finish = True elif isinstance(message, ChatResponseChoice): message.clean_text() self.executing_agent.choices.append(message) self.executing_agent.choices[-1].simplify() self.executing_agent.choices[-1].finished = True self.last_message_finish = True def add_message(self, agent_name: str, message: Union[ChatResponseStreamChoice, ChatResponseChoice]): assert agent_name == self.executing_agent.agent_name self._add_message_to_agent_response(message) def add_executing_agent_info(self, agent_name, llm_name, instruction=None, add_to_context=False, add_to_final=None): assert self.last_agent_finish is True print( f'add agent, agent_name: {agent_name}, llm_name: {llm_name}, instruction: {instruction}, add_to_context: {add_to_context}, add_to_final: {add_to_final}') from agent_config import AgentCNNameDict self.executing_agent = AgentCompletionResponse( agent_name=agent_name, llm_name=llm_name, agent_name_cn=AgentCNNameDict[agent_name], instruction=instruction, executed=False, add_to_context=add_to_context, add_to_final=add_to_final ) self.last_message_finish = True self.last_agent_finish = False def set_last_plan_execute(self, exec_res): self.executing_agent.exec_res = exec_res if len(self.executing_agent.choices) > 0 and exec_res == self.executing_agent.choices[-1].content: self.executing_agent.exec_res = self.executing_agent.choices[-1].content_simple self.executing_agent.choices[-1].content_simple = None self.executing_agent.choices[-1].finished = True # 用来判断是否包含空间分析的结果 self.plan_response.agent_responses.append(self.executing_agent) self.last_agent_finish = True self.executing_agent.executed = True self.executing_agent = None def get_last_plan_output(self): return self.plan_response.agent_responses[-1].exec_res def get_context(self, add_user_request=True, executed=True, add_plan_msg=False, add_sql_code=False): result = '' if self.user_request and add_user_request: result += f'用户的原始Question为:{self.user_request}\n' if self.plan_msg and add_plan_msg: result += self.plan_msg if executed == True: summary_str = '' idx = 1 for agent_info in self.plan_response.agent_responses: if agent_info.add_to_context: summary_str += f'{idx}. {agent_info.get_summary_str(add_sql_code)}\n' idx += 1 if summary_str: result += f"已经执行结束的Action如下:\n{summary_str}\n" if self.executing_agent: result += f"需要执行的Action如下:\n{self.executing_agent.get_summary_str(add_sql_code)}\n" return result def get_summary_context(self): result = f'用户的原始Question为:{self.user_request}\n' summary_str = '' idx = 1 for agent_info in self.plan_response.agent_responses: if agent_info.add_to_context and agent_info.add_to_final: summary_str += f'{idx}. {agent_info.get_summary_str(False)}\n' idx += 1 if summary_str: result += f"已经执行的结果如下:\n{summary_str}\n" return result def response_json(self): res = self.plan_response.model_dump(exclude_none=True) if self.executing_agent: res['agent_responses'].append(self.executing_agent.model_dump(exclude_none=True)) return json.dumps(res, ensure_ascii=False, default=enum_serializer) # def model_dump_json(self): # return self.model_dump_json(exclude_none=True) # 自定义序列化Enum的函数 def enum_serializer(x): if isinstance(x, Enum): return x.name raise TypeError("Unknown type") if __name__ == '__main__': plan_response = PlanResponseContextManager("你好", [])