import asyncio import json from httpx import RemoteProtocolError from qwen_agent.llm.llm_client import LLMAsyncClient from qwen_agent.messages.context_message import ChatResponseChoice, ChatResponseStreamChoice from qwen_agent.planning.planner import PlanResponseContextManager from qwen_agent.planning.plans.doc_write_plan import DocWritePlan from qwen_agent.planning.plans.land_approval_plan import LandApprovalPlan from qwen_agent.planning.plans.land_supply_plan import LandSupplyPlan from qwen_agent.planning.plans.land_site_selection_plan import LandSiteSelectionPlan from qwen_agent.planning.plans.land_use_plan import LandUsePlan from qwen_agent.planning.plans.kfq_eval_plan import KfqEvalPlan from qwen_agent.planning.plans.report_plan import ReportPlan from qwen_agent.planning.plans.gis_plan import GisPlan from qwen_agent.sub_agent import ChartAgent from qwen_agent.sub_agent.ChatAgent import ChatAgent from qwen_agent.sub_agent.KnowledgeChatAgent import KnowledgeChatAgent from qwen_agent.planning.plans.land_find_plan import LandFindPlan from qwen_agent.planning.plans.layer_operation_plan import LayerOperationPlan BIDDING_PLANS = { "Chat": "如果用户的问题和自然资源的分析无关,可以选择闲聊接口和用户闲聊", "Gis": "gis图形相关的分析和arcgis server图层查询和空间分析,擅长进行图形的相交等空间叠加分析计算。如: 上传的shp与工业用地图层空间分析的相交结果", "KnowledgeChat": "如果用户的问题和自然资源的知识有关,可以选择知识库问答接口", "LandSiteSelectionPlan": "智能选址分析,如:请帮我推荐杭州市50亩左右的工业用地?", "LandFindPlan": "找图找数,如:请帮我查一下萧山区永久基本农田面积大于100亩的地块?", "LayerOperationPlan": "图层控制系统,如打开永久基本农田图层", "LandSupplyPlan": "土地供应合同分析,用于Question中包含了[一个具体的]区域名称选择、土地供应情况。如:请分析近几年杭州市住宅用地出让情况?", "LandUsePlan": "土地利用现状,用于Question中包含了[一个具体的]区域名称选择、土地利用现状情况,包括土地的耕地面积、湿地面积等。如:2022年浙江省土地利用现状情况?", "LandApprovalPlan": "土地报批项目,用于Question中包含了[一个具体的]区域名称选择、土地报批项目情况。如:瑞安市2023年报批项目总面积?", "KfqEvalPlan": "园区及开发区评价,用于Question中包含了[一个具体的]开发区名称选择、园区评价情况。如:2020年绩效评价得分最好的园区是哪个?", "ReportPlan": "分析报告写作专家,用于Question中需要生成分析报告。如:2023年瑞安市自然资源形势分析报告?", "DocWritePlan": "公文写作生成,用于根据Question生成对应的文章,并可以对文章进行润色、扩写、续写,还能检查文章的内容是否有问题或者是否包含敏感词" } PLAN_DICT = { "Chat": ChatAgent, "KnowledgeChat": KnowledgeChatAgent, "GisPlan": GisPlan, "LandSiteSelectionPlan": LandSiteSelectionPlan, "LandFindPlan": LandFindPlan, "LayerOperationPlan": LayerOperationPlan, "LandSupplyPlan": LandSupplyPlan, "KfqEvalPlan": KfqEvalPlan, "LandApprovalPlan": LandApprovalPlan, "LandUsePlan": LandUsePlan, "ReportPlan": ReportPlan, "DocWritePlan": DocWritePlan } PROMPT_TEMPLATE = """ 你是一个土地交易市场和房地产交易市场领域商业分析计划Plan的制定者,擅长制定招投标分析的计划,来完成用户商业分析的需求。 下面是用于满足用户不同需求的Agent,请从如下的Agent中选择一个,来执行用户Question: {plans_list} 请依据参考资料,制定计划完成用户需求,按照如下格式返回: Question: 用户针对招投标问题的提问 Thought: 生成Plan的思考过程,请简要进行分析 Plan Agent: 选择的Plan 下面是一些例子: Example #0: Question: 浙江省去年土地供应情况? Thought: 用户想要了解杭浙江省去年土地供应情况,调用 LandSupplyPlan 分析模块 Plan Agent: land_supply Example #1: Question: 杭州市去年土地交易情况? Thought: 用户想要了解杭州市去年土地交易情况,Question中“杭州市”是个地市名称,调用 LandSupplyPlan 分析模块 Plan Agent: LandSupplyPlan Example #2: Question: 你是谁? Thought: 用户该问题与招投标相关的分析无关,可以进入闲聊模式,使用Chat接口与用户闲聊; Plan Agent: Chat Example #3: Question: 什么是工业用地? Thought: 用户想要了解自然资源相关知识,Question中的意图是查询什么是工业用地,所以需要通过知识库进行回答,应该使用KnowledgeChat模块; Plan Agent: Competition Example #4: Question: 工业用地竞买流程? Thought: 用户想要了解土地拍卖流程相关知识,Question中的意图是查询工业用地的竞买流程,所以需要通过知识库进行回答,应该使用KnowledgeChat模块; Plan Agent: Competition Example #5: Question: 浙江省2022年土地利用现状总面积是多少? Thought: 用户想要了解杭浙江省2022年土地利用现状情况,调用 LandUsePlan 分析模块 Plan Agent: land_use Example #6: Question: 请分析浙江省2022年各地级市耕地面积,并绘制柱状图 Thought: 用户想要分析浙江省2022年各地级市耕地面积,并绘制柱状图,调用 LandUsePlan 分析模块 Plan Agent: LandUsePlan Example #7: Question: 帮我分析下上传的shp和供地图层的相交结果 Thought: 用户想要分析下上传的shp和供地图层的相交结果,调用 GisPlan 分析模块 Plan Agent: GisPlan Example #8: Question: 2023年瑞安市自然资源形势分析报告 Thought: 用户想要编写2023年瑞安市自然资源形势分析报告,调用 ReportPlan 分析模块 Plan Agent: ReportPlan Example #9: Question: 公文生成,对文字进行润色、扩写、续写,检查文章的内容是否有错误、是否包含敏感词等 Thought: 用户想要使用文章相关的功能,调用 DocWritePlan 分析模块 Plan Agent: DocWritePlan Example #10: Question: 请帮我在西湖区找出面积最大的商服用地,数据表是公告地块 Thought: 用户想要从公告地块表种进行选址分析,调用 LandSiteSelectionPlan 分析模块 Plan Agent: LandSiteSelectionPlan Example #11: Question: 帮我在萧山区找出面积大于100亩的永久基本农田图斑 Thought: 用户想要找出永久基本农田地块,调用 LandFindPlan 分析模块 Plan Agent: LandFindPlan Example #12: Question: 帮我打开永久基本农田图层 Thought: 用户想要打开永久基本农田图层,调用 LayerOperationPlan 分析模块 Plan Agent: LayerOperationPlan 注意: 1.Plan Agent 返回的都是单一的,不要出现多个plan,不要出现多个plan, 比如以下情况:Plan Agent: LandUsePlan, ReportPlan """ INSTRUCTION = """ 现在用户的Question是: {user_request} 下面请你按照上面的格式,选择合理的分析规划师。 """ class PlanDispatcher: def __init__(self, llm_name, llm_dict, llm=None, stream=True, name='plan_dispatcher', max_retry_cnt=3): # self.actions_list_str = json.dumps(plan_list, ensure_ascii=False) self.llm: LLMAsyncClient = llm self.llm_name = llm_name self.stream = stream self.name = name self.max_retry_cnt = max_retry_cnt self.llm_dict = llm_dict async def run(self, plan_context: PlanResponseContextManager, messages=None): user_request = plan_context.user_request system_prompt = PROMPT_TEMPLATE.format( plans_list=json.dumps(BIDDING_PLANS, ensure_ascii=False) ) instruction = INSTRUCTION.format(user_request=user_request) _messages = [{ 'role': 'system', 'content': system_prompt }] if messages: _messages.extend(messages) _messages.append({ "role": "user", "content": instruction }) # for msg in _messages: for i, msg in enumerate(_messages): if not isinstance(msg, dict): msg = dict(msg) if msg['type'].value == 1: msg['role'] = 'user' msg['content'] = msg['data'] else: msg['role'] = 'assistant' msg['content'] = dict(msg['data'])['exec_res'][0] msg['history'] = True del msg['data'] del msg['type'] _messages[i] = msg if 'history' in msg and msg['history']: print('is history messsage') else: yield ChatResponseChoice(role=msg['role'], content=msg['content']) retry_cnt = self.max_retry_cnt while True: try: rep = await self.llm.chat(model=self.llm_name, messages=_messages, stream=self.stream) # await asyncio.sleep(0.1) if self.stream: res = '' async for chunk in rep: if chunk: yield ChatResponseStreamChoice(role='assistant', delta=chunk) res += chunk yield ChatResponseStreamChoice(role='assistant', finish_reason='stop') else: yield ChatResponseChoice(role='assistant', content=rep) res = rep _messages.append({ 'role': 'assistant', 'content': res }) print('plan dispatcher:', res) planner_name = res.split('Plan Agent:')[-1].split('\n')[0].strip() if planner_name not in PLAN_DICT.keys(): planner_name = 'Chat' llm_name = self.llm_dict.get(planner_name) or self.llm_dict.get("planner") or self.llm_name planner = PLAN_DICT[planner_name](llm_name=llm_name, llm=self.llm, stream=self.stream) break except Exception as e: import traceback traceback.print_exc() print(f'{type(e)}, {isinstance(e, RemoteProtocolError)}') if self.stream: yield ChatResponseStreamChoice(role='assistant', finish_reason='flush') if isinstance(e, RemoteProtocolError): await asyncio.sleep(2 ** self.max_retry_cnt + 2 ** (self.max_retry_cnt - retry_cnt + 1)) else: user_msg = { 'role': 'user', 'content': f"生成信息有误,请按照上面的格式重新生成,Plan Agent必须是{list(PLAN_DICT.keys())}中的一个", } _messages.append(user_msg) yield ChatResponseChoice(**user_msg) retry_cnt -= 1 if retry_cnt <= 0: raise Exception(f'plan dispatcher run failed: {_messages}') self.planner = planner self.exec_res = res if __name__ == '__main__': plan = PlanDispatcher() plan.run("浙江省招商局总局今年招标的产品主要有哪些")