context_message.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. from enum import Enum
  2. from pydantic import BaseModel, Field
  3. from typing import Dict, List, Literal, Optional, Union
  4. import time
  5. import re
  6. import json
  7. import uuid
  8. from qwen_agent.messages.plan_message import PlanInfo
  9. class ChatResponseClientMessage(BaseModel):
  10. name: Literal["message.chunk", "message.whole", "info.agent"] = None
  11. role: Literal["user", "assistant"] = None
  12. content: str = None
  13. finish_reason: Optional[Literal["stop", "flush"]] = None
  14. extra_info: Optional[Dict] = None
  15. class ChatResponseStreamChoice(BaseModel):
  16. role: Literal["user", "assistant"] = None
  17. delta: str = None
  18. finish_reason: Optional[Literal["stop", "flush"]] = None
  19. class SystemSignal(BaseModel):
  20. signal: Literal["user_clarification"]
  21. class ChatResponseChoice(BaseModel):
  22. role: Literal["user", "assistant", "system", "function", "info"] = "info"
  23. content: Optional[str] = None
  24. content_simple: Optional[str] = None
  25. function_call: Optional[Dict] = None
  26. clarification: Optional[Dict] = None
  27. insight: Optional[Dict] = None
  28. finished: Optional[bool] = None
  29. history: Optional[bool] = None
  30. choice_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", ""))
  31. def clean_text(self):
  32. if self.content:
  33. self.content = self.content.strip()
  34. self.content = re.sub('\n+', '\n', self.content)
  35. def simplify(self):
  36. lines = self.content.split('\n')
  37. simple_list = []
  38. for line in lines:
  39. line_low = line.lower()
  40. if line_low.startswith('final:') or line_low.startswith('thought:') \
  41. or line_low.startswith('用户的原始question为') \
  42. or line_low.startswith('question:'):
  43. # or line_low.startswith('通过执行[') \
  44. # or line_low.startswith('instruction:') \
  45. # or line_low.startswith('output:'):
  46. # or line_low.startswith('action:') or line_low.startswith('action input:') \
  47. # or line_low.startswith('已经执行结束的action如下:') \
  48. # or line_low.startswith('需要执行的action如下:') \
  49. # or line_low.startswith('sql code:')
  50. continue
  51. elif line:
  52. simple_list.append(line)
  53. # print("lines:", lines)
  54. # print("simple list:", simple_list)
  55. self.content_simple = '\n'.join(simple_list)
  56. class AgentCompletionResponse(BaseModel):
  57. choices: List[ChatResponseChoice] = []
  58. agent_name: str
  59. agent_name_cn: str
  60. llm_name: str
  61. instruction: Optional[str] = None
  62. exec_res: Optional[str] = None
  63. executed: bool = False
  64. history: Optional[bool] = None
  65. add_to_context: bool = None
  66. add_to_final: Optional[bool] = None
  67. created: Optional[int] = Field(default_factory=lambda: int(time.time()))
  68. agent_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", ""))
  69. sql_code: Optional[str] = None
  70. gis_result: Optional[object] = None
  71. def get_summary_str(self, add_sql_code):
  72. if self.agent_name == 'SpatialAnalysisAgent':
  73. self.gis_result = dict({
  74. 'originWkt': self.exec_res['原图形'],
  75. 'result': self.exec_res['result'],
  76. 'intersectWKts': self.exec_res['相交的图形'],
  77. })
  78. del self.exec_res['result']
  79. del self.exec_res['相交的图形']
  80. del self.exec_res['原图形']
  81. if self.executed:
  82. if add_sql_code and self.sql_code:
  83. return f"通过执行[{self.agent_name}]接口\nInstruction:{self.instruction}\nSQL Code:{self.sql_code}\nOutput:{self.exec_res}"
  84. else:
  85. return f"通过执行[{self.agent_name}]接口\nInstruction:{self.instruction}\nOutput:{self.exec_res}"
  86. else:
  87. return f"Instruction: {self.instruction}"
  88. class PlanCompletionResponse(BaseModel):
  89. agent_responses: List[AgentCompletionResponse] = []
  90. user_request: str = None
  91. history: List[str] = []
  92. created: Optional[int] = Field(default_factory=lambda: int(time.time()))
  93. conversation_id: Optional[str] = Field(default_factory=lambda: str(uuid.uuid4()).replace("-", ""))
  94. class PlanResponseContextManager(BaseModel):
  95. plan_response: Optional[PlanCompletionResponse] = Field(default_factory=lambda: PlanCompletionResponse())
  96. last_message_finish: bool = True
  97. last_agent_finish: bool = True
  98. executing_agent: Optional[AgentCompletionResponse] = None
  99. user_request: str = None
  100. plans: List[PlanInfo] = []
  101. plan_msg: str = None
  102. system_signal: Optional[SystemSignal] = None
  103. clarification_data: Optional[str] = None
  104. has_search_db: bool = None
  105. has_sql_data: bool = False
  106. def init_chat(self, user_request: str, history):
  107. self.plan_response.user_request = user_request
  108. self.plan_response.history = history
  109. self.user_request = user_request
  110. def _add_message_to_agent_response(self, message: Union[ChatResponseStreamChoice, ChatResponseChoice]):
  111. if isinstance(message, ChatResponseStreamChoice):
  112. if self.last_message_finish and message.delta:
  113. self.executing_agent.choices.append(ChatResponseChoice(role=message.role, content=message.delta))
  114. self.last_message_finish = False
  115. elif message.delta:
  116. assert message.role is None or message.role == self.executing_agent.choices[-1].role
  117. self.executing_agent.choices[-1].content += message.delta
  118. if message.finish_reason == "stop":
  119. self.last_message_finish = True
  120. self.executing_agent.choices[-1].clean_text()
  121. self.executing_agent.choices[-1].simplify()
  122. self.executing_agent.choices[-1].finished = True
  123. elif message.finish_reason == "flush" and not self.last_message_finish:
  124. self.executing_agent.choices.pop()
  125. self.last_message_finish = True
  126. elif isinstance(message, ChatResponseChoice):
  127. message.clean_text()
  128. self.executing_agent.choices.append(message)
  129. self.executing_agent.choices[-1].simplify()
  130. self.executing_agent.choices[-1].finished = True
  131. self.last_message_finish = True
  132. def add_message(self, agent_name: str, message: Union[ChatResponseStreamChoice, ChatResponseChoice]):
  133. assert agent_name == self.executing_agent.agent_name
  134. self._add_message_to_agent_response(message)
  135. def add_executing_agent_info(self, agent_name, llm_name, instruction=None, add_to_context=False, add_to_final=None):
  136. assert self.last_agent_finish is True
  137. print(
  138. f'add agent, agent_name: {agent_name}, llm_name: {llm_name}, instruction: {instruction}, add_to_context: {add_to_context}, add_to_final: {add_to_final}')
  139. from agent_config import AgentCNNameDict
  140. self.executing_agent = AgentCompletionResponse(
  141. agent_name=agent_name, llm_name=llm_name, agent_name_cn=AgentCNNameDict[agent_name],
  142. instruction=instruction, executed=False,
  143. add_to_context=add_to_context, add_to_final=add_to_final
  144. )
  145. self.last_message_finish = True
  146. self.last_agent_finish = False
  147. def set_last_plan_execute(self, exec_res):
  148. self.executing_agent.exec_res = exec_res
  149. if len(self.executing_agent.choices) > 0 and exec_res == self.executing_agent.choices[-1].content:
  150. self.executing_agent.exec_res = self.executing_agent.choices[-1].content_simple
  151. self.executing_agent.choices[-1].content_simple = None
  152. self.executing_agent.choices[-1].finished = True
  153. # 用来判断是否包含空间分析的结果
  154. self.plan_response.agent_responses.append(self.executing_agent)
  155. self.last_agent_finish = True
  156. self.executing_agent.executed = True
  157. self.executing_agent = None
  158. def get_last_plan_output(self):
  159. return self.plan_response.agent_responses[-1].exec_res
  160. def get_context(self, add_user_request=True, executed=True, add_plan_msg=False, add_sql_code=False):
  161. result = ''
  162. if self.user_request and add_user_request:
  163. result += f'用户的原始Question为:{self.user_request}\n'
  164. if self.plan_msg and add_plan_msg:
  165. result += self.plan_msg
  166. if executed == True:
  167. summary_str = ''
  168. idx = 1
  169. for agent_info in self.plan_response.agent_responses:
  170. if agent_info.add_to_context:
  171. summary_str += f'{idx}. {agent_info.get_summary_str(add_sql_code)}\n'
  172. idx += 1
  173. if summary_str:
  174. result += f"已经执行结束的Action如下:\n{summary_str}\n"
  175. if self.executing_agent:
  176. result += f"需要执行的Action如下:\n{self.executing_agent.get_summary_str(add_sql_code)}\n"
  177. return result
  178. def get_summary_context(self):
  179. result = f'用户的原始Question为:{self.user_request}\n'
  180. summary_str = ''
  181. idx = 1
  182. for agent_info in self.plan_response.agent_responses:
  183. if agent_info.add_to_context and agent_info.add_to_final:
  184. summary_str += f'{idx}. {agent_info.get_summary_str(False)}\n'
  185. idx += 1
  186. if summary_str:
  187. result += f"已经执行的结果如下:\n{summary_str}\n"
  188. return result
  189. def response_json(self):
  190. res = self.plan_response.model_dump(exclude_none=True)
  191. if self.executing_agent:
  192. res['agent_responses'].append(self.executing_agent.model_dump(exclude_none=True))
  193. return json.dumps(res, ensure_ascii=False, default=enum_serializer)
  194. # def model_dump_json(self):
  195. # return self.model_dump_json(exclude_none=True)
  196. # 自定义序列化Enum的函数
  197. def enum_serializer(x):
  198. if isinstance(x, Enum):
  199. return x.name
  200. raise TypeError("Unknown type")
  201. if __name__ == '__main__':
  202. plan_response = PlanResponseContextManager("你好", [])