storage.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import os
  2. from typing import Dict, Optional, Union
  3. from qwen_agent.settings import DEFAULT_WORKSPACE
  4. from qwen_agent.tools.base import BaseTool, register_tool
  5. from qwen_agent.utils.utils import read_text_from_file, save_text_to_file
  6. class KeyNotExistsError(ValueError):
  7. pass
  8. @register_tool('storage')
  9. class Storage(BaseTool):
  10. """
  11. This is a special tool for data storage
  12. """
  13. description = '存储和读取数据的工具'
  14. parameters = [{
  15. 'name': 'operate',
  16. 'type': 'string',
  17. 'description': '数据操作类型,可选项为["put", "get", "delete", "scan"]之一,分别为存数据、取数据、删除数据、遍历数据',
  18. 'required': True
  19. }, {
  20. 'name': 'key',
  21. 'type': 'string',
  22. 'description': '数据的路径,类似于文件路径,是一份数据的唯一标识,不能为空,默认根目录为`/`。存数据时,应该合理的设计路径,保证路径含义清晰且唯一。',
  23. 'default': '/'
  24. }, {
  25. 'name': 'value',
  26. 'type': 'string',
  27. 'description': '数据的内容,仅存数据时需要'
  28. }]
  29. def __init__(self, cfg: Optional[Dict] = None):
  30. super().__init__(cfg)
  31. self.root = self.cfg.get('storage_root_path', os.path.join(DEFAULT_WORKSPACE, 'tools', self.name))
  32. os.makedirs(self.root, exist_ok=True)
  33. def call(self, params: Union[str, dict], **kwargs) -> str:
  34. params = self._verify_json_format_args(params)
  35. operate = params['operate']
  36. key = params.get('key', '/')
  37. if key.startswith('/'):
  38. key = key[1:]
  39. if operate == 'put':
  40. assert 'value' in params
  41. return self.put(key, params['value'])
  42. elif operate == 'get':
  43. return self.get(key)
  44. elif operate == 'delete':
  45. return self.delete(key)
  46. else:
  47. return self.scan(key)
  48. def put(self, key: str, value: str, path: Optional[str] = None) -> str:
  49. path = path or self.root
  50. # one file for one key value pair
  51. path = os.path.join(path, key)
  52. path_dir = path[:path.rfind('/') + 1]
  53. if path_dir:
  54. os.makedirs(path_dir, exist_ok=True)
  55. save_text_to_file(path, value)
  56. return f'Successfully saved {key}.'
  57. def get(self, key: str, path: Optional[str] = None) -> str:
  58. path = path or self.root
  59. if not os.path.exists(os.path.join(path, key)):
  60. raise KeyNotExistsError(f'Get Failed: {key} does not exist')
  61. return read_text_from_file(os.path.join(path, key))
  62. def delete(self, key, path: Optional[str] = None) -> str:
  63. path = path or self.root
  64. path = os.path.join(path, key)
  65. if os.path.exists(path):
  66. os.remove(path)
  67. return f'Successfully deleted {key}'
  68. else:
  69. return f'Delete Failed: {key} does not exist'
  70. def scan(self, key: str, path: Optional[str] = None) -> str:
  71. path = path or self.root
  72. path = os.path.join(path, key)
  73. if os.path.exists(path):
  74. if not os.path.isdir(path):
  75. return 'Scan Failed: The scan operation requires passing in a folder path as the key.'
  76. # All key-value pairs
  77. kvs = {}
  78. for root, dirs, files in os.walk(path):
  79. for file in files:
  80. k = os.path.join(root, file)[len(path):]
  81. if not k.startswith('/'):
  82. k = '/' + k
  83. v = read_text_from_file(os.path.join(root, file))
  84. kvs[k] = v
  85. return '\n'.join([f'{k}: {v}' for k, v in kvs.items()])
  86. else:
  87. return f'Scan Failed: {key} does not exist.'