import json from qwen_agent.planning.planner import Planner from qwen_agent.memory.plan_memory import PlanExampleRetrieval actions_list = { "GisGeocoderAgent": "用于获取用户问题中详细地址的坐标点", "LandFindSqlAgent": "用于查询永久基本农田表的Agent", "summary": "对表查询的结果,进行总结摘要,提炼用户关注的信息", } PROMPT_TEMPLATE = """ 你是一个土地交易市场分析助手,擅长土地交易信息查询与分析,来完成用户区域土地交易情况分析的需求。 下面是动作选项列表,可以选择有助于完成用户需求的一个或多个动作: 动作列表 {actions_list} 请依据参考资料,制定计划完成用户需求,按照如下格式返回: Question: 用户针区域土地交易问题的提问 Thought: 生成Plan的思考过程,如果提到多个区域名称,需要多次调用[LandFindSqlAgent]接口,给用户一些依据。 Plan: 生成的计划,包含函数名和执行的目标。以JSON格式返回,所有的Action会以执行的先后顺序保存在list中,例如: [{{"action_name": 第一步执行的Action名字, "instruction": 执行需要达到的预期,请给出详细有效的指示}}, {{"action_name": 第二步执行的Action名字, "instruction": 执行需要达到的预期,请给出详细有效的指示}}] 举例: {example_list} 注意0:如果用户需要生成图表,那么最后一步请调用[generate_chart]这个action,针对用户Question和查询数据库结果生成options; 注意1:最后一步必须调用[summary]这个action,针对用户Question和查询数据库结果进行总结摘要,提取用户关注信息; 注意2:如果出现SUM函数,必须使用:: FLOAT将返回结果转换为float类型 """ examples = """ ``` """ DEFAULT_PLAN = """使用默认执行计划: Plan: [{"action_name": "LandFindSqlAgent", "instruction": "你需要从数据库查询土地表,统计Question中需要的信息"}, {"action_name": "summary","instruction": "根据以上的结果,你需要对结果进行总结分析,回答用户Question"}] """ class LandFindPlan(Planner): def __init__(self, llm_name, name='land_find', **kvargs): self.actions_list_str = json.dumps(actions_list, ensure_ascii=False) # retriever = PlanExampleRetrieval(query_type=name) super(LandFindPlan, self).__init__( llm_name=llm_name, name=f'{name}_planner', system_prompt=PROMPT_TEMPLATE, retriever=PlanExampleRetrieval(query_type='land_find'), default_plan=DEFAULT_PLAN, **kvargs ) def get_system_prompt(self, user_request): similar_exams = self.retriever.get_relevant_documents(user_request, top_k=3) examples = '\n'.join([f"Example #{i + 1}:\n{q}" for i, (n, q) in enumerate(similar_exams[::-1])]) # print(f'user request:{user_request} \nRetrivaled Examples: {examples}\n') return PROMPT_TEMPLATE.format(actions_list=self.actions_list_str, example_list=examples)