123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- from enum import Enum
- from pydantic import BaseModel, Field
- from typing import Dict, List, Literal, Optional, Union
- import time
- import re
- import json
- import uuid
- from qwen_agent.messages.plan_message import PlanInfo
- class ChatResponseClientMessage(BaseModel):
- name: Literal["message.chunk", "message.whole", "info.agent"] = None
- role: Literal["user", "assistant"] = None
- content: str = None
- finish_reason: Optional[Literal["stop", "flush"]] = None
- extra_info: Optional[Dict] = None
- class ChatResponseStreamChoice(BaseModel):
- role: Literal["user", "assistant"] = None
- delta: str = None
- finish_reason: Optional[Literal["stop", "flush"]] = None
- class SystemSignal(BaseModel):
- signal: Literal["user_clarification"]
- class ChatResponseChoice(BaseModel):
- role: Literal["user", "assistant", "system", "function", "info"] = "info"
- content: Optional[str] = None
- content_simple: Optional[str] = None
- function_call: Optional[Dict] = None
- clarification: Optional[Dict] = None
- insight: Optional[Dict] = None
- finished: Optional[bool] = None
- history: Optional[bool] = None
- choice_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", ""))
- def clean_text(self):
- if self.content:
- self.content = self.content.strip()
- self.content = re.sub('\n+', '\n', self.content)
- def simplify(self):
- lines = self.content.split('\n')
- simple_list = []
- for line in lines:
- line_low = line.lower()
- if line_low.startswith('final:') or line_low.startswith('thought:') \
- or line_low.startswith('用户的原始question为') \
- or line_low.startswith('question:'):
- # or line_low.startswith('通过执行[') \
- # or line_low.startswith('instruction:') \
- # or line_low.startswith('output:'):
- # or line_low.startswith('action:') or line_low.startswith('action input:') \
- # or line_low.startswith('已经执行结束的action如下:') \
- # or line_low.startswith('需要执行的action如下:') \
- # or line_low.startswith('sql code:')
- continue
- elif line:
- simple_list.append(line)
- # print("lines:", lines)
- # print("simple list:", simple_list)
- self.content_simple = '\n'.join(simple_list)
- class AgentCompletionResponse(BaseModel):
- choices: List[ChatResponseChoice] = []
- agent_name: str
- agent_name_cn: str
- llm_name: str
- instruction: Optional[str] = None
- exec_res: Optional[str] = None
- executed: bool = False
- history: Optional[bool] = None
- add_to_context: bool = None
- add_to_final: Optional[bool] = None
- created: Optional[int] = Field(default_factory=lambda: int(time.time()))
- agent_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", ""))
- sql_code: Optional[str] = None
- gis_result: Optional[object] = None
- def get_summary_str(self, add_sql_code):
- if self.agent_name == 'SpatialAnalysisAgent':
- self.gis_result = dict({
- 'originWkt': self.exec_res['原图形'],
- 'result': self.exec_res['result'],
- 'intersectWKts': self.exec_res['相交的图形'],
- })
- del self.exec_res['result']
- del self.exec_res['相交的图形']
- del self.exec_res['原图形']
- if self.executed:
- if add_sql_code and self.sql_code:
- return f"通过执行[{self.agent_name}]接口\nInstruction:{self.instruction}\nSQL Code:{self.sql_code}\nOutput:{self.exec_res}"
- else:
- return f"通过执行[{self.agent_name}]接口\nInstruction:{self.instruction}\nOutput:{self.exec_res}"
- else:
- return f"Instruction: {self.instruction}"
- class PlanCompletionResponse(BaseModel):
- agent_responses: List[AgentCompletionResponse] = []
- user_request: str = None
- history: List[str] = []
- created: Optional[int] = Field(default_factory=lambda: int(time.time()))
- conversation_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", ""))
- class PlanResponseContextManager(BaseModel):
- plan_response: Optional[PlanCompletionResponse] = Field(default_factory=lambda: PlanCompletionResponse())
- last_message_finish: bool = True
- last_agent_finish: bool = True
- executing_agent: Optional[AgentCompletionResponse] = None
- user_request: str = None
- plans: List[PlanInfo] = []
- plan_msg: str = None
- system_signal: Optional[SystemSignal] = None
- clarification_data: Optional[str] = None
- has_search_db: bool = None
- has_sql_data: bool = False
- def init_chat(self, user_request: str, history):
- self.plan_response.user_request = user_request
- self.plan_response.history = history
- self.user_request = user_request
- def _add_message_to_agent_response(self, message: Union[ChatResponseStreamChoice, ChatResponseChoice]):
- if isinstance(message, ChatResponseStreamChoice):
- if self.last_message_finish and message.delta:
- self.executing_agent.choices.append(ChatResponseChoice(role=message.role, content=message.delta))
- self.last_message_finish = False
- elif message.delta:
- assert message.role is None or message.role == self.executing_agent.choices[-1].role
- self.executing_agent.choices[-1].content += message.delta
- if message.finish_reason == "stop":
- self.last_message_finish = True
- self.executing_agent.choices[-1].clean_text()
- self.executing_agent.choices[-1].simplify()
- self.executing_agent.choices[-1].finished = True
- elif message.finish_reason == "flush" and not self.last_message_finish:
- self.executing_agent.choices.pop()
- self.last_message_finish = True
- elif isinstance(message, ChatResponseChoice):
- message.clean_text()
- self.executing_agent.choices.append(message)
- self.executing_agent.choices[-1].simplify()
- self.executing_agent.choices[-1].finished = True
- self.last_message_finish = True
- def add_message(self, agent_name: str, message: Union[ChatResponseStreamChoice, ChatResponseChoice]):
- assert agent_name == self.executing_agent.agent_name
- self._add_message_to_agent_response(message)
- def add_executing_agent_info(self, agent_name, llm_name, instruction=None, add_to_context=False, add_to_final=None):
- assert self.last_agent_finish is True
- print(
- f'add agent, agent_name: {agent_name}, llm_name: {llm_name}, instruction: {instruction}, add_to_context: {add_to_context}, add_to_final: {add_to_final}')
- from agent_config import AgentCNNameDict
- self.executing_agent = AgentCompletionResponse(
- agent_name=agent_name, llm_name=llm_name, agent_name_cn=AgentCNNameDict[agent_name],
- instruction=instruction, executed=False,
- add_to_context=add_to_context, add_to_final=add_to_final
- )
- self.last_message_finish = True
- self.last_agent_finish = False
- def set_last_plan_execute(self, exec_res):
- self.executing_agent.exec_res = exec_res
- if len(self.executing_agent.choices) > 0 and exec_res == self.executing_agent.choices[-1].content:
- self.executing_agent.exec_res = self.executing_agent.choices[-1].content_simple
- self.executing_agent.choices[-1].content_simple = None
- self.executing_agent.choices[-1].finished = True
- # 用来判断是否包含空间分析的结果
- self.plan_response.agent_responses.append(self.executing_agent)
- self.last_agent_finish = True
- self.executing_agent.executed = True
- self.executing_agent = None
- def get_last_plan_output(self):
- return self.plan_response.agent_responses[-1].exec_res
- def get_context(self, add_user_request=True, executed=True, add_plan_msg=False, add_sql_code=False):
- result = ''
- if self.user_request and add_user_request:
- result += f'用户的原始Question为:{self.user_request}\n'
- if self.plan_msg and add_plan_msg:
- result += self.plan_msg
- if executed == True:
- summary_str = ''
- idx = 1
- for agent_info in self.plan_response.agent_responses:
- if agent_info.add_to_context:
- summary_str += f'{idx}. {agent_info.get_summary_str(add_sql_code)}\n'
- idx += 1
- if summary_str:
- result += f"已经执行结束的Action如下:\n{summary_str}\n"
- if self.executing_agent:
- result += f"需要执行的Action如下:\n{self.executing_agent.get_summary_str(add_sql_code)}\n"
- return result
- def get_summary_context(self):
- result = f'用户的原始Question为:{self.user_request}\n'
- summary_str = ''
- idx = 1
- for agent_info in self.plan_response.agent_responses:
- if agent_info.add_to_context and agent_info.add_to_final:
- summary_str += f'{idx}. {agent_info.get_summary_str(False)}\n'
- idx += 1
- if summary_str:
- result += f"已经执行的结果如下:\n{summary_str}\n"
- return result
- def response_json(self):
- res = self.plan_response.model_dump(exclude_none=True)
- if self.executing_agent:
- res['agent_responses'].append(self.executing_agent.model_dump(exclude_none=True))
- return json.dumps(res, ensure_ascii=False, default=enum_serializer)
- # def model_dump_json(self):
- # return self.model_dump_json(exclude_none=True)
- # 自定义序列化Enum的函数
- def enum_serializer(x):
- if isinstance(x, Enum):
- return x.name
- raise TypeError("Unknown type")
- if __name__ == '__main__':
- plan_response = PlanResponseContextManager("你好", [])
|