plan_dispatcher.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. import asyncio
  2. import json
  3. from httpx import RemoteProtocolError
  4. from qwen_agent.llm.llm_client import LLMAsyncClient
  5. from qwen_agent.messages.context_message import ChatResponseChoice, ChatResponseStreamChoice
  6. from qwen_agent.planning.planner import PlanResponseContextManager
  7. from qwen_agent.planning.plans.doc_write_plan import DocWritePlan
  8. from qwen_agent.planning.plans.land_approval_plan import LandApprovalPlan
  9. from qwen_agent.planning.plans.land_supply_plan import LandSupplyPlan
  10. from qwen_agent.planning.plans.land_site_selection_plan import LandSiteSelectionPlan
  11. from qwen_agent.planning.plans.land_use_plan import LandUsePlan
  12. from qwen_agent.planning.plans.kfq_eval_plan import KfqEvalPlan
  13. from qwen_agent.planning.plans.report_plan import ReportPlan
  14. from qwen_agent.planning.plans.gis_plan import GisPlan
  15. from qwen_agent.sub_agent import ChartAgent
  16. from qwen_agent.sub_agent.ChatAgent import ChatAgent
  17. from qwen_agent.sub_agent.KnowledgeChatAgent import KnowledgeChatAgent
  18. from qwen_agent.planning.plans.land_find_plan import LandFindPlan
  19. from qwen_agent.planning.plans.layer_operation_plan import LayerOperationPlan
  20. BIDDING_PLANS = {
  21. "Chat": "如果用户的问题和自然资源的分析无关,可以选择闲聊接口和用户闲聊",
  22. "Gis": "gis图形相关的分析和arcgis server图层查询和空间分析,擅长进行图形的相交等空间叠加分析计算。如: 上传的shp与工业用地图层空间分析的相交结果",
  23. "KnowledgeChat": "如果用户的问题和自然资源的知识有关,可以选择知识库问答接口",
  24. "LandSiteSelectionPlan": "智能选址分析,如:请帮我推荐杭州市50亩左右的工业用地?",
  25. "LandFindPlan": "找图找数,如:请帮我查一下萧山区永久基本农田面积大于100亩的地块?",
  26. "LayerOperationPlan": "图层控制系统,如打开永久基本农田图层",
  27. "LandSupplyPlan": "土地供应合同分析,用于Question中包含了[一个具体的]区域名称选择、土地供应情况。如:请分析近几年杭州市住宅用地出让情况?",
  28. "LandUsePlan": "土地利用现状,用于Question中包含了[一个具体的]区域名称选择、土地利用现状情况,包括土地的耕地面积、湿地面积等。如:2022年浙江省土地利用现状情况?",
  29. "LandApprovalPlan": "土地报批项目,用于Question中包含了[一个具体的]区域名称选择、土地报批项目情况。如:瑞安市2023年报批项目总面积?",
  30. "KfqEvalPlan": "园区及开发区评价,用于Question中包含了[一个具体的]开发区名称选择、园区评价情况。如:2020年绩效评价得分最好的园区是哪个?",
  31. "ReportPlan": "分析报告写作专家,用于Question中需要生成分析报告。如:2023年瑞安市自然资源形势分析报告?",
  32. "DocWritePlan": "公文写作生成,用于根据Question生成对应的文章,并可以对文章进行润色、扩写、续写,还能检查文章的内容是否有问题或者是否包含敏感词"
  33. }
  34. PLAN_DICT = {
  35. "Chat": ChatAgent,
  36. "KnowledgeChat": KnowledgeChatAgent,
  37. "GisPlan": GisPlan,
  38. "LandSiteSelectionPlan": LandSiteSelectionPlan,
  39. "LandFindPlan": LandFindPlan,
  40. "LayerOperationPlan": LayerOperationPlan,
  41. "LandSupplyPlan": LandSupplyPlan,
  42. "KfqEvalPlan": KfqEvalPlan,
  43. "LandApprovalPlan": LandApprovalPlan,
  44. "LandUsePlan": LandUsePlan,
  45. "ReportPlan": ReportPlan,
  46. "DocWritePlan": DocWritePlan
  47. }
  48. PROMPT_TEMPLATE = """
  49. 你是一个土地交易市场和房地产交易市场领域商业分析计划Plan的制定者,擅长制定招投标分析的计划,来完成用户商业分析的需求。
  50. 下面是用于满足用户不同需求的Agent,请从如下的Agent中选择一个,来执行用户Question:
  51. {plans_list}
  52. 请依据参考资料,制定计划完成用户需求,按照如下格式返回:
  53. Question: 用户针对招投标问题的提问
  54. Thought: 生成Plan的思考过程,请简要进行分析
  55. Plan Agent: 选择的Plan
  56. 下面是一些例子:
  57. Example #0:
  58. Question: 浙江省去年土地供应情况?
  59. Thought: 用户想要了解杭浙江省去年土地供应情况,调用 LandSupplyPlan 分析模块
  60. Plan Agent: land_supply
  61. Example #1:
  62. Question: 杭州市去年土地交易情况?
  63. Thought: 用户想要了解杭州市去年土地交易情况,Question中“杭州市”是个地市名称,调用 LandSupplyPlan 分析模块
  64. Plan Agent: LandSupplyPlan
  65. Example #2:
  66. Question: 你是谁?
  67. Thought: 用户该问题与招投标相关的分析无关,可以进入闲聊模式,使用Chat接口与用户闲聊;
  68. Plan Agent: Chat
  69. Example #3:
  70. Question: 什么是工业用地?
  71. Thought: 用户想要了解自然资源相关知识,Question中的意图是查询什么是工业用地,所以需要通过知识库进行回答,应该使用KnowledgeChat模块;
  72. Plan Agent: Competition
  73. Example #4:
  74. Question: 工业用地竞买流程?
  75. Thought: 用户想要了解土地拍卖流程相关知识,Question中的意图是查询工业用地的竞买流程,所以需要通过知识库进行回答,应该使用KnowledgeChat模块;
  76. Plan Agent: Competition
  77. Example #5:
  78. Question: 浙江省2022年土地利用现状总面积是多少?
  79. Thought: 用户想要了解杭浙江省2022年土地利用现状情况,调用 LandUsePlan 分析模块
  80. Plan Agent: land_use
  81. Example #6:
  82. Question: 请分析浙江省2022年各地级市耕地面积,并绘制柱状图
  83. Thought: 用户想要分析浙江省2022年各地级市耕地面积,并绘制柱状图,调用 LandUsePlan 分析模块
  84. Plan Agent: LandUsePlan
  85. Example #7:
  86. Question: 帮我分析下上传的shp和供地图层的相交结果
  87. Thought: 用户想要分析下上传的shp和供地图层的相交结果,调用 GisPlan 分析模块
  88. Plan Agent: GisPlan
  89. Example #8:
  90. Question: 2023年瑞安市自然资源形势分析报告
  91. Thought: 用户想要编写2023年瑞安市自然资源形势分析报告,调用 ReportPlan 分析模块
  92. Plan Agent: ReportPlan
  93. Example #9:
  94. Question: 公文生成,对文字进行润色、扩写、续写,检查文章的内容是否有错误、是否包含敏感词等
  95. Thought: 用户想要使用文章相关的功能,调用 DocWritePlan 分析模块
  96. Plan Agent: DocWritePlan
  97. Example #10:
  98. Question: 请帮我在西湖区找出面积最大的商服用地,数据表是公告地块
  99. Thought: 用户想要从公告地块表种进行选址分析,调用 LandSiteSelectionPlan 分析模块
  100. Plan Agent: LandSiteSelectionPlan
  101. Example #11:
  102. Question: 帮我在萧山区找出面积大于100亩的永久基本农田图斑
  103. Thought: 用户想要找出永久基本农田地块,调用 LandFindPlan 分析模块
  104. Plan Agent: LandFindPlan
  105. Example #12:
  106. Question: 帮我打开永久基本农田图层
  107. Thought: 用户想要打开永久基本农田图层,调用 LayerOperationPlan 分析模块
  108. Plan Agent: LayerOperationPlan
  109. 注意:
  110. 1.Plan Agent 返回的都是单一的,不要出现多个plan,不要出现多个plan, 比如以下情况:Plan Agent: LandUsePlan, ReportPlan
  111. """
  112. INSTRUCTION = """
  113. 现在用户的Question是: {user_request}
  114. 下面请你按照上面的格式,选择合理的分析规划师。
  115. """
  116. class PlanDispatcher:
  117. def __init__(self, llm_name, llm_dict, llm=None, stream=True, name='plan_dispatcher', max_retry_cnt=3):
  118. # self.actions_list_str = json.dumps(plan_list, ensure_ascii=False)
  119. self.llm: LLMAsyncClient = llm
  120. self.llm_name = llm_name
  121. self.stream = stream
  122. self.name = name
  123. self.max_retry_cnt = max_retry_cnt
  124. self.llm_dict = llm_dict
  125. async def run(self, plan_context: PlanResponseContextManager, messages=None):
  126. user_request = plan_context.user_request
  127. system_prompt = PROMPT_TEMPLATE.format(
  128. plans_list=json.dumps(BIDDING_PLANS, ensure_ascii=False)
  129. )
  130. instruction = INSTRUCTION.format(user_request=user_request)
  131. _messages = [{
  132. 'role': 'system',
  133. 'content': system_prompt
  134. }]
  135. if messages:
  136. _messages.extend(messages)
  137. _messages.append({
  138. "role": "user",
  139. "content": instruction
  140. })
  141. # for msg in _messages:
  142. for i, msg in enumerate(_messages):
  143. if not isinstance(msg, dict):
  144. msg = dict(msg)
  145. if msg['type'].value == 1:
  146. msg['role'] = 'user'
  147. msg['content'] = msg['data']
  148. else:
  149. msg['role'] = 'assistant'
  150. msg['content'] = dict(msg['data'])['exec_res'][0]
  151. msg['history'] = True
  152. del msg['data']
  153. del msg['type']
  154. _messages[i] = msg
  155. if 'history' in msg and msg['history']:
  156. print('is history messsage')
  157. else:
  158. yield ChatResponseChoice(role=msg['role'], content=msg['content'])
  159. retry_cnt = self.max_retry_cnt
  160. while True:
  161. try:
  162. rep = await self.llm.chat(model=self.llm_name, messages=_messages, stream=self.stream)
  163. # await asyncio.sleep(0.1)
  164. if self.stream:
  165. res = ''
  166. async for chunk in rep:
  167. if chunk:
  168. yield ChatResponseStreamChoice(role='assistant', delta=chunk)
  169. res += chunk
  170. yield ChatResponseStreamChoice(role='assistant', finish_reason='stop')
  171. else:
  172. yield ChatResponseChoice(role='assistant', content=rep)
  173. res = rep
  174. _messages.append({
  175. 'role': 'assistant',
  176. 'content': res
  177. })
  178. print('plan dispatcher:', res)
  179. planner_name = res.split('Plan Agent:')[-1].split('\n')[0].strip()
  180. if planner_name not in PLAN_DICT.keys():
  181. planner_name = 'Chat'
  182. llm_name = self.llm_dict.get(planner_name) or self.llm_dict.get("planner") or self.llm_name
  183. planner = PLAN_DICT[planner_name](llm_name=llm_name, llm=self.llm, stream=self.stream)
  184. break
  185. except Exception as e:
  186. import traceback
  187. traceback.print_exc()
  188. print(f'{type(e)}, {isinstance(e, RemoteProtocolError)}')
  189. if self.stream:
  190. yield ChatResponseStreamChoice(role='assistant', finish_reason='flush')
  191. if isinstance(e, RemoteProtocolError):
  192. await asyncio.sleep(2 ** self.max_retry_cnt + 2 ** (self.max_retry_cnt - retry_cnt + 1))
  193. else:
  194. user_msg = {
  195. 'role': 'user',
  196. 'content': f"生成信息有误,请按照上面的格式重新生成,Plan Agent必须是{list(PLAN_DICT.keys())}中的一个",
  197. }
  198. _messages.append(user_msg)
  199. yield ChatResponseChoice(**user_msg)
  200. retry_cnt -= 1
  201. if retry_cnt <= 0:
  202. raise Exception(f'plan dispatcher run failed: {_messages}')
  203. self.planner = planner
  204. self.exec_res = res
  205. if __name__ == '__main__':
  206. plan = PlanDispatcher()
  207. plan.run("浙江省招商局总局今年招标的产品主要有哪些")