123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import os
- from typing import Dict, Optional, Union
- from qwen_agent.settings import DEFAULT_WORKSPACE
- from qwen_agent.tools.base import BaseTool, register_tool
- from qwen_agent.utils.utils import read_text_from_file, save_text_to_file
- class KeyNotExistsError(ValueError):
- pass
- @register_tool('storage')
- class Storage(BaseTool):
- """
- This is a special tool for data storage
- """
- description = '存储和读取数据的工具'
- parameters = [{
- 'name': 'operate',
- 'type': 'string',
- 'description': '数据操作类型,可选项为["put", "get", "delete", "scan"]之一,分别为存数据、取数据、删除数据、遍历数据',
- 'required': True
- }, {
- 'name': 'key',
- 'type': 'string',
- 'description': '数据的路径,类似于文件路径,是一份数据的唯一标识,不能为空,默认根目录为`/`。存数据时,应该合理的设计路径,保证路径含义清晰且唯一。',
- 'default': '/'
- }, {
- 'name': 'value',
- 'type': 'string',
- 'description': '数据的内容,仅存数据时需要'
- }]
- def __init__(self, cfg: Optional[Dict] = None):
- super().__init__(cfg)
- self.root = self.cfg.get('storage_root_path', os.path.join(DEFAULT_WORKSPACE, 'tools', self.name))
- os.makedirs(self.root, exist_ok=True)
- def call(self, params: Union[str, dict], **kwargs) -> str:
- params = self._verify_json_format_args(params)
- operate = params['operate']
- key = params.get('key', '/')
- if key.startswith('/'):
- key = key[1:]
- if operate == 'put':
- assert 'value' in params
- return self.put(key, params['value'])
- elif operate == 'get':
- return self.get(key)
- elif operate == 'delete':
- return self.delete(key)
- else:
- return self.scan(key)
- def put(self, key: str, value: str, path: Optional[str] = None) -> str:
- path = path or self.root
- # one file for one key value pair
- path = os.path.join(path, key)
- path_dir = path[:path.rfind('/') + 1]
- if path_dir:
- os.makedirs(path_dir, exist_ok=True)
- save_text_to_file(path, value)
- return f'Successfully saved {key}.'
- def get(self, key: str, path: Optional[str] = None) -> str:
- path = path or self.root
- if not os.path.exists(os.path.join(path, key)):
- raise KeyNotExistsError(f'Get Failed: {key} does not exist')
- return read_text_from_file(os.path.join(path, key))
- def delete(self, key, path: Optional[str] = None) -> str:
- path = path or self.root
- path = os.path.join(path, key)
- if os.path.exists(path):
- os.remove(path)
- return f'Successfully deleted {key}'
- else:
- return f'Delete Failed: {key} does not exist'
- def scan(self, key: str, path: Optional[str] = None) -> str:
- path = path or self.root
- path = os.path.join(path, key)
- if os.path.exists(path):
- if not os.path.isdir(path):
- return 'Scan Failed: The scan operation requires passing in a folder path as the key.'
- # All key-value pairs
- kvs = {}
- for root, dirs, files in os.walk(path):
- for file in files:
- k = os.path.join(root, file)[len(path):]
- if not k.startswith('/'):
- k = '/' + k
- v = read_text_from_file(os.path.join(root, file))
- kvs[k] = v
- return '\n'.join([f'{k}: {v}' for k, v in kvs.items()])
- else:
- return f'Scan Failed: {key} does not exist.'
|