summary_agent.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import asyncio
  2. import traceback
  3. from typing import List
  4. from httpx import RemoteProtocolError
  5. from qwen_agent.planning.planner import PlanResponseContextManager
  6. from qwen_agent.sub_agent.BaseSubAgent import BaseSubAgent
  7. from qwen_agent.messages.context_message import ChatResponseChoice, ChatResponseStreamChoice
  8. import re
  9. SYSTEM_PROMPT = """
  10. 你是一个对前面提到的分析过程和结果进行总结摘要的专家,给你一段用户的提问,以及之前代码执行的过程和执行结果,你可以对整个分析的过程进行总结摘要,来回答用户提出的问题。
  11. 那么我问你,请对前面提到的分析过程和结果进行总结摘要,回答用户的问题。
  12. 注意:
  13. 1. 如果用户查询的Question通过数据库查询,没有返回结果,请直接回答“数据库中没有查询到相关的数据”,不允许胡编乱造。如果在数据库中查询到相关的结果,请根据结果回答用户问题。
  14. 2. 请不要对show_case的查询到的记录做过与详细的描述;
  15. 3. 如果用户查询的Question没有涉及到数据库查询,比如GIS图层操作,请直接回答没有相关数据。
  16. """
  17. agents_prompt = dict({
  18. 'LandSupplySqlAgent': '返回的数据结果面积单位是亩,金额单位是万元, 楼面价和土地单价的单位是万元/平方米',
  19. 'LandUseSqlAgent': """
  20. 返回的数据结果面积单位是公顷,不要对其进行转换,不要对其四舍五入
  21. 各一级类包含以下二级类:
  22. 湿地:包含以下几种土地利用现状小类 红树林地、森林沼泽、灌丛沼泽、沼泽草地、沿海滩涂、内陆滩涂和沼泽地,
  23. 耕地:包含以下几种土地利用现状小类 水田,水浇地和旱地,
  24. 园林:包含以下几种土地利用现状小类 果园,茶园,橡胶园和其他园地
  25. 林地:包含以下几种土地利用现状小类 乔木林地,竹林地,灌木林地和其他林地
  26. 草地:包含以下几种土地利用现状小类 天然牧草地,人工牧草地和其他草地
  27. 城镇村及工矿用地:包含以下几种土地利用现状小类 城市用地,建制镇用地,村庄用地,采矿用地和风景名胜及特殊用地
  28. 交通运输用地:包含以下几种土地利用现状小类 铁路用地,轨道交通用地,公路用地,农村道路,机场用地,港口码头用地和管道运输用地
  29. 水域及水利设施用地:包含以下几种土地利用现状小类 河流水面,湖泊水面,水库水面,坑塘水面,沟渠,水工建筑用地和冰川及常年积雪
  30. """,
  31. 'SpatialAnalysisAgent': """
  32. 1. 总结时不要输出图形的wkt信息或者其他坐标点信息
  33. """,
  34. 'LandFindSqlAgent':"""
  35. 不对结果进行总结,请直接回答没有相关数据
  36. """,
  37. 'GisLayerOperationAgent':"""
  38. 不对结果进行总结,请直接回答没有相关数据
  39. """,
  40. 'LandSiteSelectionSqlAgent': """
  41. 必须按照markdown格式输出的地块信息。以下是输出信息的参考,请将<>替换成真实的内容:
  42. ### 1.<地块名称>
  43. ### 2.<地块名称>
  44. 下面是输出时的注意事项:
  45. 1.生成markdown 注意必须严格使用换行符,不可章节之间出现没有换行符的情况
  46. 2.必须严格按照上面的结构输出信息
  47. 3.不要输出除上面结构之外的信息
  48. """,
  49. })
  50. class SummaryAgent(BaseSubAgent):
  51. def __init__(self, llm=None, llm_name=None, stream=False, name='summary'):
  52. super().__init__(llm=llm, llm_name=llm_name, stream=stream, name=name)
  53. async def run(self, plan_context: PlanResponseContextManager, messages: List[str]):
  54. query = plan_context.get_summary_context()
  55. _messages = [{
  56. "role": "system",
  57. "content": self.handle_prompt(plan_context, SYSTEM_PROMPT)
  58. }]
  59. _messages.extend(messages)
  60. _messages.append({
  61. 'role': 'user',
  62. 'content': query
  63. })
  64. # print(f"query of summary agent: {query}")
  65. # for msg in _messages:
  66. for i, msg in enumerate(_messages):
  67. if not isinstance(msg, dict):
  68. msg = dict(msg)
  69. if msg['type'].value == 1:
  70. msg['role'] = 'user'
  71. msg['content'] = msg['data']
  72. else:
  73. msg['role'] = 'assistant'
  74. msg['content'] = dict(msg['data'])['exec_res'][0]
  75. msg['history'] = True
  76. del msg['data']
  77. del msg['type']
  78. _messages[i] = msg
  79. if 'history' in msg and msg['history']:
  80. print('is history messsage')
  81. else:
  82. yield ChatResponseChoice(role=msg['role'], content=msg['content'])
  83. retry_round = 0
  84. self.exec_res = 'Summary error, please try again...'
  85. self.is_success = False
  86. while retry_round < self.max_retry_round:
  87. try:
  88. rsp = await self.llm.chat(model=self.llm_name, messages=_messages, stream=self.stream)
  89. if self.stream:
  90. res = ""
  91. async for chunk in rsp:
  92. res += chunk
  93. yield ChatResponseStreamChoice(role='assistant', delta=chunk)
  94. yield ChatResponseStreamChoice(role='assistant', finish_reason='stop')
  95. else:
  96. res = rsp
  97. yield ChatResponseChoice(role='assistant', content=rsp)
  98. print(f'summary input: {query} \n summary output: {res}')
  99. pattern = r'<think>.*?</think>\n*'
  100. res = re.sub(pattern, '', res, flags=re.DOTALL)
  101. self.exec_res = res
  102. self.is_success = True
  103. break
  104. except Exception as e:
  105. traceback.print_exc()
  106. if self.stream:
  107. yield ChatResponseStreamChoice(role='assistant', finish_reason='flush')
  108. if isinstance(e, RemoteProtocolError):
  109. await asyncio.sleep(2 ** self.max_retry_round + 2 ** (self.max_retry_round - retry_round + 1))
  110. retry_round += 1
  111. def handle_prompt(self, plan_context: PlanResponseContextManager, prompt: str):
  112. plans = plan_context.plans
  113. for i, plan in enumerate(plans):
  114. if plan.action_name in dict.keys(agents_prompt):
  115. return prompt + agents_prompt[plan.action_name]
  116. return prompt