123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- import asyncio
- from openai import OpenAI, AsyncOpenAI
- import copy
- # from qwen_agent.llm.base import LLMBase
- import re
- class LLMClient:
- def __init__(self, model='qwen-plus', api_key='none', model_server=''):
- self.model_server = model_server
- self._client = OpenAI(
- api_key=api_key, base_url=model_server
- )
- self.model = model
- def chat(self, query=None, model=None, stream=False, messages=None, stop=None, functions=None):
- if not model:
- model = self.model
- if stream:
- return self._chat_stream(query, model, messages, stop=stop, functions=functions)
- else:
- return self._chat_no_stream(query, model, messages, stop=stop, functions=functions)
- def _chat_stream(self, query, model, messages=None, stop=None, functions=None, **kvargs):
- pattern = r'<think>.*?</think>'
- print(f'begin: stream to lianqi client, model name: {model}')
- if messages:
- response = self._client.chat.completions.create(
- model=model, messages=messages, stop=stop, stream=True, functions=functions, **kvargs)
- else:
- response = self._client.chat.completions.create(
- model=model,
- messages=[{'role': 'user', 'content': query}],
- stop=stop,
- stream=True,
- functions=functions,
- **kvargs
- )
- for chunk in response:
- if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
- print(chunk.choices[0].delta.content, end='', flush=True)
- result = re.sub(pattern, '', chunk.choices[0].delta.content, flags=re.DOTALL)
- yield result
- def _chat_no_stream(self, query, model=None, messages=None, stop=None, functions=None, **kvargs):
- print(f'begin: no stream to lianqi client, model name: {model}')
- if not model:
- model = self.model
- if messages:
- if query:
- local_message = copy.deepcopy(messages)
- local_message.append({'role': 'user','content': query})
- _messages = local_message
- else:
- _messages = messages
- response = self._client.chat.completions.create(model=model,
- messages=_messages,
- stop=stop,
- stream=False,
- functions=functions,
- **kvargs)
- else:
- response = self._client.chat.completions.create(model=model,
- messages=[{
- 'role': 'user',
- 'content': query
- }],
- stop=stop,
- stream=False,
- functions=functions,
- **kvargs)
- # print(response.choices[0].message.content)
- return response.choices[0].message.content
- def chat_func(self, model=None, messages=[], functions=None):
- print('begin: no stream in lianqi chat_func')
- if not model:
- model = self.model
- if not functions:
- functions = []
- if functions:
- response = self._client.chat.completions.create(
- model=model, messages=messages, functions=functions
- )
- else:
- response = self._client.chat.completions.create(model=self, messages=messages)
- return response.choices[0].message
- class LLMAsyncClient:
- def __init__(self, model='qwen-plus', api_key='none', model_server=''):
- self.model_server = model_server
- self._client = AsyncOpenAI(
- api_key=api_key, base_url=model_server
- )
- self.model = model
- async def chat(self, query=None, model=None, stream=False, messages=None, stop=[], functions=None):
- if not model:
- model = self.model
- if stream:
- response = self._chat_stream(query, model, messages, stop=stop, functions=functions)
- else:
- response = self._chat_no_stream(query, model, messages, stop=stop, functions=functions)
- # await asyncio.sleep(0.1)
- return response
- async def _chat_stream(self, query, model, messages=None, stop=[], functions=None, **kvargs):
- print(f'begin: stream to lianqi client, model name: {model}')
- if messages:
- response = await self._client.chat.completions.create(
- model=model, messages=messages, stop=stop, stream=True, **kvargs)
- else:
- response = await self._client.chat.completions.create(
- model=model,
- messages=[{'role': 'user', 'content': query}],
- stop=stop,
- stream=True,
- functions=functions,
- **kvargs
- )
- async for chunk in response:
- if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content:
- print(chunk.choices[0].delta.content, end='', flush=True)
- pattern = r'<think>.*?</think>\n*'
- result = re.sub(pattern, '', chunk.choices[0].delta.content, flags=re.DOTALL)
- yield result
- async def _chat_no_stream(self, query, model=None, messages=None, stop=None, functions=None, **kvargs):
- print(f'begin: no stream to lianqi client, model name: {model}')
- if not model:
- model = self.model
- if messages:
- if query:
- local_message = copy.deepcopy(messages)
- local_message.append({'role': 'user','content': query})
- _messages = local_message
- else:
- _messages = messages
- response = await self._client.chat.completions.create(model=model,
- messages=_messages,
- stop=stop,
- stream=False,
- functions=functions,
- **kvargs)
- else:
- response = await self._client.chat.completions.create(model=model,
- messages=[{
- 'role': 'user',
- 'content': query
- }],
- stop=stop,
- stream=False,
- functions=functions,
- **kvargs)
- # print(response.choices[0].message.content)
- return response.choices[0].message.content
|