import copy import random from typing import Dict, Iterator, List, Optional, Union from qwen_agent import Agent, MultiAgentHub from qwen_agent.agents.assistant import Assistant from qwen_agent.agents.group_chat_auto_router import GroupChatAutoRouter from qwen_agent.agents.user_agent import PENDING_USER_INPUT, UserAgent from qwen_agent.llm import BaseChatModel from qwen_agent.llm.schema import Message from qwen_agent.log import logger from qwen_agent.tools import BaseTool class GroupChat(Agent, MultiAgentHub): """This is an agent for multi-agent management. This agent can accept a list of agents, manage their speaking order, and output the response of each agent. """ _VALID_AGENT_SELECTION_METHODS = ['manual', 'round_robin', 'random', 'auto'] def __init__(self, agents: Union[List[Agent], Dict], agent_selection_method: Optional[str] = 'auto', function_list: Optional[List[Union[str, Dict, BaseTool]]] = None, llm: Optional[Union[Dict, BaseChatModel]] = None, **kwargs): """Initialization the agent. Args: agents: A list of agents of agent configurations. One configuration example is: { 'background': 'An interest group', 'agents': [{ 'name': 'Tang Xiao', 'description': 'A hardworking worker, addicted to work every day, gradually losing weight.', 'is_human': True # mark this as a real person }, { 'name': 'Tou Da', 'description': 'A sports student', 'instructions': 'You are a sports student who loves sports.', 'knowledge_files': ['http://example.html'], 'selected_tools': ['image_gen'] }] } agent_selection_method: The method of select speaker: (1) auto: Using one host agent to choose the speaker according to the context. (2) round_robin: Speak in order. (3) random: Random speech. function_list: The tools for inputting to the host. llm: The LLM for inputting to the host. """ super().__init__(**kwargs) assert agent_selection_method in self._VALID_AGENT_SELECTION_METHODS, f'You must choose agent_selection_method from {", ".join(self._VALID_AGENT_SELECTION_METHODS)}' self.agent_selection_method = agent_selection_method if isinstance(agents, dict): self._agents = self._init_agents_from_config(agents, llm=llm) else: self._agents = agents if self.agent_selection_method == 'auto': assert llm is not None, 'Need to provide LLM to the host in auto mode' self.host = GroupChatAutoRouter(function_list=function_list, llm=llm, agents=self.agents, name='host') def _run(self, messages: List[Message] = None, lang: str = 'zh', max_round: Optional[int] = 3, need_batch_response: bool = True, mentioned_agents_name: List[str] = None, **kwargs) -> Iterator[List[Message]]: messages = copy.deepcopy(messages) for message in messages: if message.role == 'assistant': assert message.name, 'In group chat, each agent must be given a name' # Name will be used for router # Todo: Dealing with situations where there are no real players if not message.name: message.name = message.role if need_batch_response: return self._gen_batch_response(messages=messages, lang=lang, max_round=max_round, mentioned_agents_name=mentioned_agents_name, **kwargs) else: return self._gen_one_response(messages=messages, lang=lang, mentioned_agents_name=mentioned_agents_name, **kwargs) def _gen_batch_response(self, messages: List[Message] = None, lang: str = 'zh', max_round: Optional[int] = 3, mentioned_agents_name: List[str] = None, **kwargs) -> Iterator[List[Message]]: # Record all mentioned agents: reply in order mentioned_agents_name = mentioned_agents_name or [] messages = copy.deepcopy(messages) response = [] for i in range(max_round): if isinstance(messages[-1].content, list): content = '\n'.join([x.text if x.text else '' for x in messages[-1].content]).strip() else: content = messages[-1].content.strip() if '@' in content: for x in content.split('@'): for agent in self.agents: if x.startswith(agent.name): if agent not in mentioned_agents_name: mentioned_agents_name.append(agent.name) break rsp = [] for rsp in self._gen_one_response(messages=messages, lang=lang, mentioned_agents_name=mentioned_agents_name, **kwargs): yield response + rsp if not rsp: # The topic ends break if mentioned_agents_name: assert rsp[-1].name == mentioned_agents_name[0] mentioned_agents_name.pop(0) response += rsp if rsp[-1].content == PENDING_USER_INPUT: # Terminate group chat and wait for user input break messages.extend(rsp) yield response def _gen_one_response(self, messages: List[Message] = None, lang: str = 'zh', mentioned_agents_name: List[str] = None, **kwargs) -> Iterator[List[Message]]: selected_agent = self._select_agent(messages, mentioned_agents_name, lang) if selected_agent: logger.info(f'selected_agent_name: {selected_agent.name}') new_messages = self._manage_messages(messages, selected_agent.name) for rsp in selected_agent.run(messages=new_messages, **kwargs): yield rsp else: yield [] def _select_agent(self, messages: List[Message], mentioned_agents_name: List[str] = None, lang: str = 'zh') -> Union[Agent, None]: agents_map = {x.name: x for x in self.agents} if mentioned_agents_name: # Manually select agent return agents_map[mentioned_agents_name[0]] if self.agent_selection_method == 'auto': *_, last = self.host.run(messages=messages, lang=lang) auto_selected_agent = None if isinstance(last[-1]['content'], str): auto_selected_agent = last[-1]['content'] else: assert isinstance(last[-1]['content'], list) if 'text' in last[-1]['content'][0]: auto_selected_agent = last[-1]['content'][0]['text'] if auto_selected_agent in agents_map.keys(): return agents_map[auto_selected_agent] elif auto_selected_agent == '[STOP]': return None if self.agent_selection_method == 'random': agent = random.choice(list(self.agents)) return agent if self.agent_selection_method == 'manual': for i in range(3): agent_key = input('Please enter the selected agent name: ') if agent_key in agents_map.keys(): return agents_map[agent_key] else: logger.warning(f'Please select one agent from {str(list(agents_map.keys()))}') # round_robin if messages: agents_list = [x.name for x in self.agents] try: last_agent_index = agents_list.index(messages[-1]['name']) except ValueError: last_agent_index = -1 else: last_agent_index = -1 return self.agents[(last_agent_index + 1) % len(self.agents)] def _manage_messages(self, messages: List[Message], name: str) -> List[Message]: new_messages = [] new_msg = None i = 0 while i < len(messages): msg = messages[i] if msg.name == name: if new_msg: # Have 'user' before 'assistant' new_messages.append(new_msg) if not msg.function_call and ( # noqa (not new_messages) or (new_messages[-1].name == name)): # noqa new_messages.append(Message('user', f'{name}: ')) new_msg = copy.deepcopy(msg) new_msg.role = 'assistant' new_messages.append(new_msg) new_msg = None if msg.function_call: # Append the function call msg assert messages[i + 1].role == 'function' new_messages.append(copy.deepcopy(messages[i + 1])) i += 1 else: if isinstance(msg.content, list): content = '\n'.join([x.text if x.text else '' for x in msg.content]).strip() else: content = msg.content.strip() if content.strip(): if not new_msg: new_msg = Message('user', f'{msg.name}: {content.strip()}') else: new_msg.content += f'\n{msg.name}: {content.strip()}' if msg.function_call: # Skip the function call msg assert messages[i + 1].role == 'function' assert messages[i + 2].role == 'assistant' and messages[i + 2].name == msg.name i += 1 i += 1 if new_msg: new_messages.append(new_msg) if new_messages and new_messages[-1].role == 'user': new_messages[-1].content += f'\n{name}: ' else: new_messages.append(Message('user', f'{name}: ')) return new_messages def _init_agents_from_config(self, cfgs: Dict, llm: Optional[Union[Dict, BaseChatModel]] = None) -> List[Agent]: def _build_system_from_role_config(config: Dict): role_chat_prompt = """你是{name}。{description}\n\n{instructions}""" name = config.get('name', '').strip() description = config.get('description', '').lstrip('\n').rstrip() instructions = config.get('instructions', '').lstrip('\n').rstrip() if len(instructions) >= len(description): description = '' # redundant, as we already have instructions else: description = f'你的简介是:{description}' prompt = role_chat_prompt.format(name=name, description=description, instructions=instructions) knowledge_files = config.get('knowledge_files', []) selected_tools = config.get('selected_tools', []) return prompt, knowledge_files, selected_tools agents = [] groupchat_background = '你在一个群聊中,' if cfgs.get('background', ''): groupchat_background += f'群聊背景为:{cfgs["background"]}' for cfg in cfgs['agents']: system, knowledge_files, selected_tools = _build_system_from_role_config(cfg) if 'is_human' in cfg and cfg['is_human']: # Append human agent agents.append(UserAgent(name=cfg['name'], description=cfg['description'])) else: # Create npc agent by config other_agents = [] for x in cfgs['agents']: if x['name'] != cfg['name']: other_agents.append(x['name']) agents.append( Assistant(llm=llm, system_message=groupchat_background + system + f'\n\n群里其他成员包括:{", ".join(other_agents)},如果你想和别人对话,可以@成员名字。\n' + '\n\n讲话时请直接输出内容,不要输出你的名字。\n\n其他群友的发言历史以如下格式展示:\n角色名: 说话内容', files=knowledge_files, function_list=selected_tools, name=cfg['name'], description=cfg['description'])) return agents