import asyncio import traceback from typing import List from httpx import RemoteProtocolError from qwen_agent.planning.planner import PlanResponseContextManager from qwen_agent.sub_agent.BaseSubAgent import BaseSubAgent from qwen_agent.messages.context_message import ChatResponseChoice, ChatResponseStreamChoice import re SYSTEM_PROMPT = """ 你是一个对前面提到的分析过程和结果进行总结摘要的专家,给你一段用户的提问,以及之前代码执行的过程和执行结果,你可以对整个分析的过程进行总结摘要,来回答用户提出的问题。 那么我问你,请对前面提到的分析过程和结果进行总结摘要,回答用户的问题。 注意: 1. 如果用户查询的Question通过数据库查询,没有返回结果,请直接回答“数据库中没有查询到相关的数据”,不允许胡编乱造。如果在数据库中查询到相关的结果,请根据结果回答用户问题。 2. 请不要对show_case的查询到的记录做过与详细的描述; 3. 如果用户查询的Question没有涉及到数据库查询,比如GIS图层操作,请直接回答没有相关数据。 """ agents_prompt = dict({ 'LandSupplySqlAgent': '返回的数据结果面积单位是亩,金额单位是万元, 楼面价和土地单价的单位是万元/平方米', 'LandUseSqlAgent': """ 返回的数据结果面积单位是公顷,不要对其进行转换,不要对其四舍五入 各一级类包含以下二级类: 湿地:包含以下几种土地利用现状小类 红树林地、森林沼泽、灌丛沼泽、沼泽草地、沿海滩涂、内陆滩涂和沼泽地, 耕地:包含以下几种土地利用现状小类 水田,水浇地和旱地, 园林:包含以下几种土地利用现状小类 果园,茶园,橡胶园和其他园地 林地:包含以下几种土地利用现状小类 乔木林地,竹林地,灌木林地和其他林地 草地:包含以下几种土地利用现状小类 天然牧草地,人工牧草地和其他草地 城镇村及工矿用地:包含以下几种土地利用现状小类 城市用地,建制镇用地,村庄用地,采矿用地和风景名胜及特殊用地 交通运输用地:包含以下几种土地利用现状小类 铁路用地,轨道交通用地,公路用地,农村道路,机场用地,港口码头用地和管道运输用地 水域及水利设施用地:包含以下几种土地利用现状小类 河流水面,湖泊水面,水库水面,坑塘水面,沟渠,水工建筑用地和冰川及常年积雪 """, 'SpatialAnalysisAgent': """ 1. 总结时不要输出图形的wkt信息或者其他坐标点信息 """, 'LandFindSqlAgent':""" 不对结果进行总结,请直接回答没有相关数据 """, 'GisLayerOperationAgent':""" 不对结果进行总结,请直接回答没有相关数据 """, 'LandSiteSelectionSqlAgent': """ 必须按照markdown格式输出的地块信息。以下是输出信息的参考,请将<>替换成真实的内容: ### 1.<地块名称> ### 2.<地块名称> 下面是输出时的注意事项: 1.生成markdown 注意必须严格使用换行符,不可章节之间出现没有换行符的情况 2.必须严格按照上面的结构输出信息 3.不要输出除上面结构之外的信息 """, }) class SummaryAgent(BaseSubAgent): def __init__(self, llm=None, llm_name=None, stream=False, name='summary'): super().__init__(llm=llm, llm_name=llm_name, stream=stream, name=name) async def run(self, plan_context: PlanResponseContextManager, messages: List[str]): query = plan_context.get_summary_context() _messages = [{ "role": "system", "content": self.handle_prompt(plan_context, SYSTEM_PROMPT) }] _messages.extend(messages) _messages.append({ 'role': 'user', 'content': query }) # print(f"query of summary agent: {query}") # for msg in _messages: for i, msg in enumerate(_messages): if not isinstance(msg, dict): msg = dict(msg) if msg['type'].value == 1: msg['role'] = 'user' msg['content'] = msg['data'] else: msg['role'] = 'assistant' msg['content'] = dict(msg['data'])['exec_res'][0] msg['history'] = True del msg['data'] del msg['type'] _messages[i] = msg if 'history' in msg and msg['history']: print('is history messsage') else: yield ChatResponseChoice(role=msg['role'], content=msg['content']) retry_round = 0 self.exec_res = 'Summary error, please try again...' self.is_success = False while retry_round < self.max_retry_round: try: rsp = await self.llm.chat(model=self.llm_name, messages=_messages, stream=self.stream) if self.stream: res = "" async for chunk in rsp: res += chunk yield ChatResponseStreamChoice(role='assistant', delta=chunk) yield ChatResponseStreamChoice(role='assistant', finish_reason='stop') else: res = rsp yield ChatResponseChoice(role='assistant', content=rsp) print(f'summary input: {query} \n summary output: {res}') pattern = r'.*?\n*' res = re.sub(pattern, '', res, flags=re.DOTALL) self.exec_res = res self.is_success = True break except Exception as e: traceback.print_exc() if self.stream: yield ChatResponseStreamChoice(role='assistant', finish_reason='flush') if isinstance(e, RemoteProtocolError): await asyncio.sleep(2 ** self.max_retry_round + 2 ** (self.max_retry_round - retry_round + 1)) retry_round += 1 def handle_prompt(self, plan_context: PlanResponseContextManager, prompt: str): plans = plan_context.plans for i, plan in enumerate(plans): if plan.action_name in dict.keys(agents_prompt): return prompt + agents_prompt[plan.action_name] return prompt