doc_parser.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. import json
  2. import os
  3. import re
  4. import time
  5. from typing import Dict, List, Optional, Union
  6. import json5
  7. from pydantic import BaseModel
  8. from qwen_agent.log import logger
  9. from qwen_agent.settings import DEFAULT_MAX_REF_TOKEN, DEFAULT_PARSER_PAGE_SIZE, DEFAULT_WORKSPACE
  10. from qwen_agent.tools.base import BaseTool, register_tool
  11. from qwen_agent.tools.simple_doc_parser import PARAGRAPH_SPLIT_SYMBOL, SimpleDocParser, get_plain_doc
  12. from qwen_agent.tools.storage import KeyNotExistsError, Storage
  13. from qwen_agent.utils.tokenization_qwen import count_tokens, tokenizer
  14. from qwen_agent.utils.utils import get_basename_from_url, hash_sha256
  15. class Chunk(BaseModel):
  16. content: str
  17. metadata: dict
  18. token: int
  19. def __init__(self, content: str, metadata: dict, token: int):
  20. super().__init__(content=content, metadata=metadata, token=token)
  21. def to_dict(self) -> dict:
  22. return {'content': self.content, 'metadata': self.metadata, 'token': self.token}
  23. class Record(BaseModel):
  24. url: str
  25. raw: List[Chunk]
  26. title: str
  27. def __init__(self, url: str, raw: List[Chunk], title: str):
  28. super().__init__(url=url, raw=raw, title=title)
  29. def to_dict(self) -> dict:
  30. return {'url': self.url, 'raw': [x.to_dict() for x in self.raw], 'title': self.title}
  31. @register_tool('doc_parser')
  32. class DocParser(BaseTool):
  33. description = '对一个文件进行内容提取和分块、返回分块后的文件内容'
  34. parameters = [{
  35. 'name': 'url',
  36. 'type': 'string',
  37. 'description': '待解析的文件的路径,可以是一个本地路径或可下载的http(s)链接',
  38. 'required': True
  39. }]
  40. def __init__(self, cfg: Optional[Dict] = None):
  41. super().__init__(cfg)
  42. self.max_ref_token: int = self.cfg.get('max_ref_token', DEFAULT_MAX_REF_TOKEN)
  43. self.parser_page_size: int = self.cfg.get('parser_page_size', DEFAULT_PARSER_PAGE_SIZE)
  44. self.data_root = self.cfg.get('path', os.path.join(DEFAULT_WORKSPACE, 'tools', self.name))
  45. self.db = Storage({'storage_root_path': self.data_root})
  46. self.doc_extractor = SimpleDocParser({'structured_doc': True})
  47. def call(self, params: Union[str, dict], **kwargs) -> dict:
  48. """Extracting and blocking
  49. Returns:
  50. Parse doc as the following chunks:
  51. {
  52. 'url': 'This is the url of this file',
  53. 'title': 'This is the extracted title of this file',
  54. 'raw': [
  55. {
  56. 'content': 'This is one chunk',
  57. 'token': 'The token number',
  58. 'metadata': {} # some information of this chunk
  59. },
  60. ...,
  61. ]
  62. }
  63. """
  64. params = self._verify_json_format_args(params)
  65. # Compatible with the parameter passing of the qwen-agent version <= 0.0.3
  66. max_ref_token = kwargs.get('max_ref_token', self.max_ref_token)
  67. parser_page_size = kwargs.get('parser_page_size', self.parser_page_size)
  68. url = params['url']
  69. cached_name_chunking = f'{hash_sha256(url)}_{str(parser_page_size)}'
  70. try:
  71. # Directly load the chunked doc
  72. record = self.db.get(cached_name_chunking)
  73. try:
  74. record = json5.loads(record)
  75. except ValueError:
  76. logger.warning(
  77. f'Encountered ValueError raised by json5. Fall back to json. File: {cached_name_chunking}')
  78. record = json.loads(record)
  79. logger.info(f'Read chunked {url} from cache.')
  80. return record
  81. except KeyNotExistsError:
  82. doc = self.doc_extractor.call({'url': url})
  83. total_token = 0
  84. for page in doc:
  85. for para in page['content']:
  86. total_token += para['token']
  87. if doc and 'title' in doc[0]:
  88. title = doc[0]['title']
  89. else:
  90. title = get_basename_from_url(url)
  91. logger.info(f'Start chunking {url} ({title})...')
  92. time1 = time.time()
  93. if total_token <= max_ref_token:
  94. # The whole doc is one chunk
  95. content = [
  96. Chunk(content=get_plain_doc(doc),
  97. metadata={
  98. 'source': url,
  99. 'title': title,
  100. 'chunk_id': 0
  101. },
  102. token=total_token)
  103. ]
  104. cached_name_chunking = f'{hash_sha256(url)}_without_chunking'
  105. else:
  106. content = self.split_doc_to_chunk(doc, url, title=title, parser_page_size=parser_page_size)
  107. time2 = time.time()
  108. logger.info(f'Finished chunking {url} ({title}). Time spent: {time2 - time1} seconds.')
  109. # save the document data
  110. new_record = Record(url=url, raw=content, title=title).to_dict()
  111. new_record_str = json.dumps(new_record, ensure_ascii=False)
  112. self.db.put(cached_name_chunking, new_record_str)
  113. return new_record
  114. def split_doc_to_chunk(self,
  115. doc: List[dict],
  116. path: str,
  117. title: str = '',
  118. parser_page_size: int = DEFAULT_PARSER_PAGE_SIZE) -> List[Chunk]:
  119. res = []
  120. chunk = []
  121. available_token = parser_page_size
  122. has_para = False
  123. for page in doc:
  124. page_num = page['page_num']
  125. if not chunk or f'[page: {str(page_num)}]' != chunk[0]:
  126. chunk.append(f'[page: {str(page_num)}]')
  127. idx = 0
  128. len_para = len(page['content'])
  129. while idx < len_para:
  130. if not chunk:
  131. chunk.append(f'[page: {str(page_num)}]')
  132. para = page['content'][idx]
  133. txt = para.get('text', para.get('table'))
  134. token = para['token']
  135. if token <= available_token:
  136. available_token -= token
  137. chunk.append([txt, page_num])
  138. has_para = True
  139. idx += 1
  140. else:
  141. if has_para:
  142. # Record one chunk
  143. if isinstance(chunk[-1], str) and re.fullmatch(r'^\[page: \d+\]$', chunk[-1]) is not None:
  144. chunk.pop() # Redundant page information
  145. res.append(
  146. Chunk(content=PARAGRAPH_SPLIT_SYMBOL.join(
  147. [x if isinstance(x, str) else x[0] for x in chunk]),
  148. metadata={
  149. 'source': path,
  150. 'title': title,
  151. 'chunk_id': len(res)
  152. },
  153. token=parser_page_size - available_token))
  154. # Define new chunk
  155. overlap_txt = self._get_last_part(chunk)
  156. if overlap_txt.strip():
  157. chunk = [f'[page: {str(chunk[-1][1])}]', overlap_txt]
  158. has_para = False
  159. available_token = parser_page_size - count_tokens(overlap_txt)
  160. else:
  161. chunk = []
  162. has_para = False
  163. available_token = parser_page_size
  164. else:
  165. # There are excessively long paragraphs present
  166. # Split paragraph to sentences
  167. _sentences = re.split(r'\. |。', txt)
  168. sentences = []
  169. for s in _sentences:
  170. token = count_tokens(s)
  171. if not s.strip() or token == 0:
  172. continue
  173. if token <= available_token:
  174. sentences.append([s, token])
  175. else:
  176. # Limit the length of a sentence to chunk size
  177. token_list = tokenizer.tokenize(s)
  178. for si in range(0, len(token_list), available_token):
  179. ss = tokenizer.convert_tokens_to_string(
  180. token_list[si:min(len(token_list), si + available_token)])
  181. sentences.append([ss, min(available_token, len(token_list) - si)])
  182. sent_index = 0
  183. while sent_index < len(sentences):
  184. s = sentences[sent_index][0]
  185. token = sentences[sent_index][1]
  186. if not chunk:
  187. chunk.append(f'[page: {str(page_num)}]')
  188. if token <= available_token or (not has_para):
  189. # Be sure to add at least one sentence
  190. # (not has_para) is a patch of the previous sentence splitting
  191. available_token -= token
  192. chunk.append([s, page_num])
  193. has_para = True
  194. sent_index += 1
  195. else:
  196. assert has_para
  197. if isinstance(chunk[-1], str) and re.fullmatch(r'^\[page: \d+\]$',
  198. chunk[-1]) is not None:
  199. chunk.pop() # Redundant page information
  200. res.append(
  201. Chunk(content=PARAGRAPH_SPLIT_SYMBOL.join(
  202. [x if isinstance(x, str) else x[0] for x in chunk]),
  203. metadata={
  204. 'source': path,
  205. 'title': title,
  206. 'chunk_id': len(res)
  207. },
  208. token=parser_page_size - available_token))
  209. overlap_txt = self._get_last_part(chunk)
  210. if overlap_txt.strip():
  211. chunk = [f'[page: {str(chunk[-1][1])}]', overlap_txt]
  212. has_para = False
  213. available_token = parser_page_size - count_tokens(overlap_txt)
  214. else:
  215. chunk = []
  216. has_para = False
  217. available_token = parser_page_size
  218. # Has split this paragraph by sentence
  219. idx += 1
  220. if has_para:
  221. if isinstance(chunk[-1], str) and re.fullmatch(r'^\[page: \d+\]$', chunk[-1]) is not None:
  222. chunk.pop() # Redundant page information
  223. res.append(
  224. Chunk(content=PARAGRAPH_SPLIT_SYMBOL.join([x if isinstance(x, str) else x[0] for x in chunk]),
  225. metadata={
  226. 'source': path,
  227. 'title': title,
  228. 'chunk_id': len(res)
  229. },
  230. token=parser_page_size - available_token))
  231. return res
  232. def _get_last_part(self, chunk: list) -> str:
  233. overlap = ''
  234. need_page = chunk[-1][1] # Only need this page to prepend
  235. available_len = 150
  236. for i in range(len(chunk) - 1, -1, -1):
  237. if not (isinstance(chunk[i], list) and len(chunk[i]) == 2):
  238. continue
  239. if chunk[i][1] != need_page:
  240. return overlap
  241. para = chunk[i][0]
  242. if len(para) <= available_len:
  243. if overlap:
  244. overlap = f'{para}{PARAGRAPH_SPLIT_SYMBOL}{overlap}'
  245. else:
  246. overlap = f'{para}'
  247. available_len -= len(para)
  248. continue
  249. sentence_split_symbol = '. '
  250. if '。' in para:
  251. sentence_split_symbol = '。'
  252. sentences = re.split(r'\. |。', para)
  253. sentences = [sentence.strip() for sentence in sentences if sentence]
  254. for j in range(len(sentences) - 1, -1, -1):
  255. sent = sentences[j]
  256. if not sent.strip():
  257. continue
  258. if len(sent) <= available_len:
  259. if overlap:
  260. overlap = f'{sent}{sentence_split_symbol}{overlap}'
  261. else:
  262. overlap = f'{sent}'
  263. available_len -= len(sent)
  264. else:
  265. return overlap
  266. return overlap