summary_agent.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import asyncio
  2. import traceback
  3. from typing import List
  4. from httpx import RemoteProtocolError
  5. from qwen_agent.planning.planner import PlanResponseContextManager
  6. from qwen_agent.sub_agent.BaseSubAgent import BaseSubAgent
  7. from qwen_agent.messages.context_message import ChatResponseChoice, ChatResponseStreamChoice
  8. SYSTEM_PROMPT = """
  9. 你是一个对前面提到的分析过程和结果进行总结摘要的专家,给你一段用户的提问,以及之前代码执行的过程和执行结果,你可以对整个分析的过程进行总结摘要,来回答用户提出的问题。
  10. 那么我问你,请对前面提到的分析过程和结果进行总结摘要,回答用户的问题。
  11. 注意:
  12. 1. 如果用户查询的Question通过数据库查询,没有返回结果,请直接回答“数据库中没有查询到相关的数据”,不允许胡编乱造。如果在数据库中查询到相关的结果,请根据结果回答用户问题。
  13. 2. 请不要对show_case的查询到的记录做过与详细的描述;
  14. 3. 如果用户查询的Question没有设计到数据库查询,请直接回答没有相关数据。
  15. """
  16. agents_prompt = dict({
  17. 'LandSupplySqlAgent': '返回的数据结果面积单位是亩,金额单位是万元, 楼面价和土地单价的单位是万元/平方米',
  18. 'LandUseSqlAgent': """
  19. 返回的数据结果面积单位是公顷,不要对其进行转换,不要对其四舍五入
  20. 各一级类包含以下二级类:
  21. 湿地:包含以下几种土地利用现状小类 红树林地、森林沼泽、灌丛沼泽、沼泽草地、沿海滩涂、内陆滩涂和沼泽地,
  22. 耕地:包含以下几种土地利用现状小类 水田,水浇地和旱地,
  23. 园林:包含以下几种土地利用现状小类 果园,茶园,橡胶园和其他园地
  24. 林地:包含以下几种土地利用现状小类 乔木林地,竹林地,灌木林地和其他林地
  25. 草地:包含以下几种土地利用现状小类 天然牧草地,人工牧草地和其他草地
  26. 城镇村及工矿用地:包含以下几种土地利用现状小类 城市用地,建制镇用地,村庄用地,采矿用地和风景名胜及特殊用地
  27. 交通运输用地:包含以下几种土地利用现状小类 铁路用地,轨道交通用地,公路用地,农村道路,机场用地,港口码头用地和管道运输用地
  28. 水域及水利设施用地:包含以下几种土地利用现状小类 河流水面,湖泊水面,水库水面,坑塘水面,沟渠,水工建筑用地和冰川及常年积雪
  29. """,
  30. 'LandApprovalSqlAgent': """
  31. 返回的数据结果面积单位是公顷
  32. 注意问题中的近几年指的是当前年份往前推几年,例如,近5年,指的是当前年份往前推5年,即,2020年,2021年,2022年,2023年 和当前年份 2024年
  33. """,
  34. 'SpatialAnalysisAgent': """
  35. 1. 总结时不要输出图形的wkt信息或者其他坐标点信息
  36. """,
  37. 'GisLayerOperationAgent':"""
  38. 总结操作的是哪个图层
  39. """,
  40. 'LandSiteSelectionSqlAgent': """
  41. 必须按照markdown格式输出的地块信息。以下是输出信息的参考,请将<>替换成真实的内容:
  42. ### 1.<地块名称>
  43. ### 2.<地块名称>
  44. 下面是输出时的注意事项:
  45. 1.生成markdown 注意必须严格使用换行符,不可章节之间出现没有换行符的情况
  46. 2.必须严格按照上面的结构输出信息
  47. 3.不要输出除上面结构之外的信息
  48. """,
  49. 'GisSurroundingFacilitiesQueryAgent': """
  50. 1. 问题中涉及交通便利性,需要结合返回数据中字段中key 为 "way"的值进行解答,不准胡编乱造,如果数据都不符合交通便利的要求,则提示用户该数据方圆5公里内无任何道路信息
  51. 2. 需要对用户提问中的数据进行提炼,对符合问题周边分析的数据进行总结,如:用户希望获取离港口近的地块,则对分析数据中哪个地块中的港口离距离近进行总结
  52. 3. 总结时必须要对筛选后的地块的周边信息进行介绍,对于不符合问题要求的周边信息的地块不进行介绍
  53. """,
  54. 'KfqEvalSqlAgent': """
  55. """
  56. })
  57. class SummaryAgent(BaseSubAgent):
  58. def __init__(self, llm=None, llm_name=None, stream=False, name='summary'):
  59. super().__init__(llm=llm, llm_name=llm_name, stream=stream, name=name)
  60. async def run(self, plan_context: PlanResponseContextManager, messages: List[str]):
  61. query = plan_context.get_summary_context()
  62. _messages = [{
  63. "role": "system",
  64. "content": self.handle_prompt(plan_context, SYSTEM_PROMPT)
  65. }]
  66. _messages.extend(messages)
  67. _messages.append({
  68. 'role': 'user',
  69. 'content': query
  70. })
  71. # print(f"query of summary agent: {query}")
  72. # for msg in _messages:
  73. for i, msg in enumerate(_messages):
  74. if not isinstance(msg, dict):
  75. msg = dict(msg)
  76. if msg['type'].value == 1:
  77. msg['role'] = 'user'
  78. msg['content'] = msg['data']
  79. else:
  80. msg['role'] = 'assistant'
  81. msg['content'] = dict(msg['data'])['exec_res'][0]
  82. msg['history'] = True
  83. del msg['data']
  84. del msg['type']
  85. _messages[i] = msg
  86. if 'history' in msg and msg['history']:
  87. print('is history messsage')
  88. else:
  89. yield ChatResponseChoice(role=msg['role'], content=msg['content'])
  90. retry_round = 0
  91. self.exec_res = 'Summary error, please try again...'
  92. self.is_success = False
  93. while retry_round < self.max_retry_round:
  94. try:
  95. rsp = await self.llm.chat(model=self.llm_name, messages=_messages, stream=self.stream)
  96. if self.stream:
  97. res = ""
  98. async for chunk in rsp:
  99. res += chunk
  100. yield ChatResponseStreamChoice(role='assistant', delta=chunk)
  101. yield ChatResponseStreamChoice(role='assistant', finish_reason='stop')
  102. else:
  103. res = rsp
  104. yield ChatResponseChoice(role='assistant', content=rsp)
  105. print(f'summary input: {query} \n summary output: {res}')
  106. self.exec_res = res
  107. self.is_success = True
  108. break
  109. except Exception as e:
  110. traceback.print_exc()
  111. if self.stream:
  112. yield ChatResponseStreamChoice(role='assistant', finish_reason='flush')
  113. if isinstance(e, RemoteProtocolError):
  114. await asyncio.sleep(2 ** self.max_retry_round + 2 ** (self.max_retry_round - retry_round + 1))
  115. retry_round += 1
  116. def handle_prompt(self, plan_context: PlanResponseContextManager, prompt: str):
  117. plans = plan_context.plans
  118. for i, plan in enumerate(plans):
  119. if plan.action_name in dict.keys(agents_prompt):
  120. return prompt + agents_prompt[plan.action_name]
  121. return prompt