123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- from qwen_agent.actions.base import Action
- from qwen_agent.tools.tools import tools_list
- from qwen_agent.tools.tools import call_plugin
- import copy
- import re
- from pydantic import BaseModel, Field
- from openai import OpenAI
- import traceback
- import asyncio
- from httpx import RemoteProtocolError
- from typing import Dict, List, Literal, Optional, Union
- import json
- from qwen_agent.llm.llm_client import LLMClient
- from qwen_agent.messages.context_message import ChatResponseChoice, ChatResponseStreamChoice
- class BaseSubAgent():
- def __init__(self, llm: Optional[LLMClient]=None, llm_name=None, stream=False, name='base_agent', max_retry_round=3):
- self.llm = llm
- if llm_name:
- self.llm_name = llm_name
- else:
- self.llm_name = llm.model
- # self.stream = False
- self.stream = stream
- self.is_success = False
- self.empty_data = False
- self.tool_list = []
- self.max_retry_round = max_retry_round
- self.REACT_INSTRUCTION = """
- #API描述
- {tools_text}
- 请依据API的描述,制定计划完成用户需求,按照如下格式返回:
- Thought: 生成计划的原因。
- Action: 当前需要使用的API,必须包含在[{tools_name_text}] 中。注意这里只需要放API的名字(name_for_model),不需要额外的信息
- Action Input: 当前API的输入参数。注意这里只需要放JSON格式的API的参数(parameters),不需要额外的信息
- Final: 以上是思考的结果。
- """
- self.TOOL_DESC = """{name_for_model}: Call this tool to interact with the {name_for_human} API. What is the {name_for_human} API useful for? {description_for_model} Parameters: {parameters}"""
- self.SubAgent_PROMPT = ''
- self.name = name
- # self._client = OpenAI(
- # api_key="none", base_url=model_server
- # )
- # def chat(self, messages, functions):
- # response = self.llm.chat(model=self.llm_name, messages=messages, functions=functions, stream=False)
- # return response.choices[0].message
- # def chat_stream(self, messages):
- # response = self.llm.chat(model=self.llm_name, messages=messages, stream=True)
- # for chunk in response:
- # yield chunk
- # async def async_chat(self, messages, functions):
- # response = await self.llm.chat(model=self.llm_name, messages=messages, functions=functions, stream=False)
- # return response.choices[0].message
- # async def async_chat_stream(self, messages):
- # response = await self.llm.chat(model=self.llm_name, messages=messages, stream=True)
- # async for chunk in response:
- # yield chunk
- def gen_system_prompt(self,functions):
- if functions:
- tools_text = []
- tools_name_text = []
- for func_info in functions:
- name = func_info.get("name", "")
- name_m = func_info.get("name_for_model", name)
- name_h = func_info.get("name_for_human", name)
- desc = func_info.get("description", "")
- desc_m = func_info.get("description_for_model", desc)
- tool = self.TOOL_DESC.format(
- name_for_model=name_m,
- name_for_human=name_h,
- # Hint: You can add the following format requirements in description:
- # "Format the arguments as a JSON object."
- # "Enclose the code within triple backticks (`) at the beginning and end of the code."
- description_for_model=desc_m,
- parameters=json.dumps(func_info["parameters"], ensure_ascii=False),
- )
- tools_text.append(tool)
- tools_name_text.append(name_m)
- tools_text = "\n\n".join(tools_text)
- tools_name_text = ", ".join(tools_name_text)
- system = self.SubAgent_PROMPT + "\n\n" + self.REACT_INSTRUCTION.format(
- tools_text=tools_text,
- tools_name_text=tools_name_text,
- )
- system = system.lstrip("\n").rstrip()
- return system
-
- def parse_response_func(self,response):
- func_name, func_args = "", ""
- i = response.find("Action:")
- j = response.find("\nAction Input:")
- k = response.find("\nFinal:")
- if 0 <= i < j: # If the text has `Action` and `Action input`,
- func_name = response[i + len("Action:") : j].strip()
- if k > 0:
- func_args = response[j + len("\nAction Input:") : k].strip()
- else:
- func_args = response[j + len("\nAction Input:") :].strip()
- if func_name:
- choice_data = {'role':"assistant","content":response[:i],
- "function_call":{"name": func_name, "arguments": func_args}
- }
- return choice_data
-
- return {'function_call':None,'content':response}
- def parse_response(self,rsp,prefix='python'):
- # print('parse_response_rsp:',rsp)
- if isinstance(rsp,str):
- rsp = self.parse_response_func(rsp)
- if rsp is not None and rsp['function_call'] is None:
- rsp['function_call'] = {}
- rsp['function_call']['name'] = None
- rsp['function_call']['arguments']=None
- triple_match = re.search(r'```[^\n]*\n(.+?)```',rsp['content'] , re.DOTALL)
- if triple_match:
- text = triple_match.group(1)
- if text.startswith(prefix):
- text=text.replace(prefix,'')
- rsp['function_call']['name'] = self.tool_list[0].get('name_for_model')
- rsp['function_call']['arguments'] = triple_match.group(1)
- return rsp
-
- return rsp
- async def _core(self, query,messages=[]):
- bot_msg,func_msg = None,None
- is_success = False
- local_message = copy.deepcopy(messages)
- local_message.insert(0,{'role': 'system','content':self.gen_system_prompt(self.tool_list)})
- local_message.append({'role': 'user','content':query})
- print(f"local_message:{local_message}")
- for msg in local_message:
- if 'history' in msg and msg['history']:
- print('is history messsage')
- else:
- yield ChatResponseChoice(role=msg['role'], content=msg['content'])
- observation = ''
- plugin_args = None
- self.retry_cnt = self.max_retry_round
- while not is_success and self.retry_cnt>0:
- # print('local_message:',local_message)
- try:
- if not self.stream:
- # rsp = self.chat(local_message,self.tool_list)
- rsp = await self.llm.chat(model=self.llm_name, messages=local_message, stream=self.stream)
- rsp = self.parse_response(rsp)
- else:
- self.stream_rsp = ''
- rsp = await self.llm.chat(model=self.llm_name, messages=local_message, stream=self.stream)
- async for r in rsp:
- if r:
- self.stream_rsp += r
- yield ChatResponseStreamChoice(role='assistant',delta=f"{r}")
- yield ChatResponseStreamChoice(role='assistant',finish_reason='stop')
- rsp = self.parse_response(self.stream_rsp)
- print('openai_rsp:',rsp)
- except Exception as e:
- traceback.print_exc()
- print(f'{type(e)}, {isinstance(e, RemoteProtocolError)}')
- if self.stream:
- yield ChatResponseStreamChoice(role='assistant', finish_reason='flush')
- if isinstance(e, RemoteProtocolError):
- await asyncio.sleep(2 ** self.max_retry_round + 2 ** (self.max_retry_round - self.retry_cnt + 1))
- self.retry_cnt -= 1
- if self.retry_cnt >= 0:
- print('retry')
- continue
- yield ChatResponseChoice(
- role='function',
- content=f"""```{rsp['function_call']['arguments']}```"""
- )
- # if rsp['function_call'] and rsp['function_call']['name'] in [l['name_for_model'] for l in self.tool_list]:
- if rsp['function_call']:
- bot_msg = {
- 'role': 'assistant',
- 'content': rsp['content'],
- 'function_call': {
- 'name': rsp['function_call']['name'],
- 'arguments': rsp['function_call']['arguments'],
- }
- }
- # yield ChatResponseChoice(**bot_msg)
- # res_params = await call_plugin(rsp['function_call']['name'], rsp['function_call']['arguments'])
- res_params = await self.run_function(rsp['function_call']['arguments'])
- observation, plugin_args, is_success = res_params
- func_msg = {
- 'role': 'function',
- 'name': rsp['function_call']['name'],
- 'content': observation,
- }
- yield ChatResponseChoice(**func_msg)
- if not is_success:
- self.retry_cnt -= 1
- user_msg = {
- 'role': 'user',
- 'content': f"""CODE:\n```\n{rsp['function_call']['arguments']}\n```\n
- TrackBack:\n{observation}\n
- 请根据以上报错信息,修改[Action Input],按照如下的格式返回:
- Thought: 修改的原因
- Action: {rsp['function_call']['name']}
- Action Input: 当前Action的输入参数。""",
- }
- local_message.append(bot_msg)
- local_message.append(user_msg)
- yield ChatResponseChoice(**user_msg)
- else:
- local_message.append(bot_msg)
- local_message.append(func_msg)
- # yield ChatResponseChoice(**func_msg)
- else:
- bot_msg = {
- 'role': 'assistant',
- 'content': f"{rsp}",
- }
- user_msg = {
- 'role': 'user',
- 'content': f"""
- [Action]中是未包含需要的API,请重新调整[Action]和[Action Input],按照如下的格式返回:
- Thought: 修改的原因
- Action: 当前需要使用的API
- Action Input: 当前API的输入参数。注意这里只需要放JSON格式的API的参数(parameters),不需要额外的信息。""",
- }
- local_message.append(bot_msg)
- local_message.append(user_msg)
- yield ChatResponseChoice(**user_msg)
- self.retry_cnt -= 1
- self.is_success = is_success
- self.plugin_args = plugin_args
-
- # return local_message, is_success
- async def continue_run(self,plan_context,user_input):
- pass
- async def run(self, plan_context,messages=[]):
- query = plan_context.get_context()
- # local_message, is_success = self._core(query,messages)
- async for msg in self._core(query,messages,self.stream):
- yield msg
- self.exec_res = msg.content
- # return local_message[-1]['content'], local_message,
-
- def parse_parameter(self,plugin_args):
- print("plugin_args", plugin_args)
- if plugin_args and plugin_args.startswith('```'):
- triple_match = re.search(r'```[^\n]*\n(.+?)```', plugin_args, re.DOTALL)
- if triple_match:
- plugin_args = triple_match.group(1)
- else:
- triple_match = re.search(r'```[^\n]*\n(.+?)', plugin_args, re.DOTALL)
- plugin_args = triple_match.group(1)
- print("plugin_args_clean:", plugin_args)
- return plugin_args
- async def run_function(self,plugin_args):
- raise NotImplementedError
- def continue_run(self, plan_context, user_input, messages):
- raise NotImplementedError
|