123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- import json
- import os
- import re
- import time
- from typing import Dict, List, Optional, Union
- import json5
- from pydantic import BaseModel
- from qwen_agent.log import logger
- from qwen_agent.settings import DEFAULT_MAX_REF_TOKEN, DEFAULT_PARSER_PAGE_SIZE, DEFAULT_WORKSPACE
- from qwen_agent.tools.base import BaseTool, register_tool
- from qwen_agent.tools.simple_doc_parser import PARAGRAPH_SPLIT_SYMBOL, SimpleDocParser, get_plain_doc
- from qwen_agent.tools.storage import KeyNotExistsError, Storage
- from qwen_agent.utils.tokenization_qwen import count_tokens, tokenizer
- from qwen_agent.utils.utils import get_basename_from_url, hash_sha256
- class Chunk(BaseModel):
- content: str
- metadata: dict
- token: int
- def __init__(self, content: str, metadata: dict, token: int):
- super().__init__(content=content, metadata=metadata, token=token)
- def to_dict(self) -> dict:
- return {'content': self.content, 'metadata': self.metadata, 'token': self.token}
- class Record(BaseModel):
- url: str
- raw: List[Chunk]
- title: str
- def __init__(self, url: str, raw: List[Chunk], title: str):
- super().__init__(url=url, raw=raw, title=title)
- def to_dict(self) -> dict:
- return {'url': self.url, 'raw': [x.to_dict() for x in self.raw], 'title': self.title}
- @register_tool('doc_parser')
- class DocParser(BaseTool):
- description = '对一个文件进行内容提取和分块、返回分块后的文件内容'
- parameters = [{
- 'name': 'url',
- 'type': 'string',
- 'description': '待解析的文件的路径,可以是一个本地路径或可下载的http(s)链接',
- 'required': True
- }]
- def __init__(self, cfg: Optional[Dict] = None):
- super().__init__(cfg)
- self.max_ref_token: int = self.cfg.get('max_ref_token', DEFAULT_MAX_REF_TOKEN)
- self.parser_page_size: int = self.cfg.get('parser_page_size', DEFAULT_PARSER_PAGE_SIZE)
- self.data_root = self.cfg.get('path', os.path.join(DEFAULT_WORKSPACE, 'tools', self.name))
- self.db = Storage({'storage_root_path': self.data_root})
- self.doc_extractor = SimpleDocParser({'structured_doc': True})
- def call(self, params: Union[str, dict], **kwargs) -> dict:
- """Extracting and blocking
- Returns:
- Parse doc as the following chunks:
- {
- 'url': 'This is the url of this file',
- 'title': 'This is the extracted title of this file',
- 'raw': [
- {
- 'content': 'This is one chunk',
- 'token': 'The token number',
- 'metadata': {} # some information of this chunk
- },
- ...,
- ]
- }
- """
- params = self._verify_json_format_args(params)
- # Compatible with the parameter passing of the qwen-agent version <= 0.0.3
- max_ref_token = kwargs.get('max_ref_token', self.max_ref_token)
- parser_page_size = kwargs.get('parser_page_size', self.parser_page_size)
- url = params['url']
- cached_name_chunking = f'{hash_sha256(url)}_{str(parser_page_size)}'
- try:
- # Directly load the chunked doc
- record = self.db.get(cached_name_chunking)
- try:
- record = json5.loads(record)
- except ValueError:
- logger.warning(
- f'Encountered ValueError raised by json5. Fall back to json. File: {cached_name_chunking}')
- record = json.loads(record)
- logger.info(f'Read chunked {url} from cache.')
- return record
- except KeyNotExistsError:
- doc = self.doc_extractor.call({'url': url})
- total_token = 0
- for page in doc:
- for para in page['content']:
- total_token += para['token']
- if doc and 'title' in doc[0]:
- title = doc[0]['title']
- else:
- title = get_basename_from_url(url)
- logger.info(f'Start chunking {url} ({title})...')
- time1 = time.time()
- if total_token <= max_ref_token:
- # The whole doc is one chunk
- content = [
- Chunk(content=get_plain_doc(doc),
- metadata={
- 'source': url,
- 'title': title,
- 'chunk_id': 0
- },
- token=total_token)
- ]
- cached_name_chunking = f'{hash_sha256(url)}_without_chunking'
- else:
- content = self.split_doc_to_chunk(doc, url, title=title, parser_page_size=parser_page_size)
- time2 = time.time()
- logger.info(f'Finished chunking {url} ({title}). Time spent: {time2 - time1} seconds.')
- # save the document data
- new_record = Record(url=url, raw=content, title=title).to_dict()
- new_record_str = json.dumps(new_record, ensure_ascii=False)
- self.db.put(cached_name_chunking, new_record_str)
- return new_record
- def split_doc_to_chunk(self,
- doc: List[dict],
- path: str,
- title: str = '',
- parser_page_size: int = DEFAULT_PARSER_PAGE_SIZE) -> List[Chunk]:
- res = []
- chunk = []
- available_token = parser_page_size
- has_para = False
- for page in doc:
- page_num = page['page_num']
- if not chunk or f'[page: {str(page_num)}]' != chunk[0]:
- chunk.append(f'[page: {str(page_num)}]')
- idx = 0
- len_para = len(page['content'])
- while idx < len_para:
- if not chunk:
- chunk.append(f'[page: {str(page_num)}]')
- para = page['content'][idx]
- txt = para.get('text', para.get('table'))
- token = para['token']
- if token <= available_token:
- available_token -= token
- chunk.append([txt, page_num])
- has_para = True
- idx += 1
- else:
- if has_para:
- # Record one chunk
- if isinstance(chunk[-1], str) and re.fullmatch(r'^\[page: \d+\]$', chunk[-1]) is not None:
- chunk.pop() # Redundant page information
- res.append(
- Chunk(content=PARAGRAPH_SPLIT_SYMBOL.join(
- [x if isinstance(x, str) else x[0] for x in chunk]),
- metadata={
- 'source': path,
- 'title': title,
- 'chunk_id': len(res)
- },
- token=parser_page_size - available_token))
- # Define new chunk
- overlap_txt = self._get_last_part(chunk)
- if overlap_txt.strip():
- chunk = [f'[page: {str(chunk[-1][1])}]', overlap_txt]
- has_para = False
- available_token = parser_page_size - count_tokens(overlap_txt)
- else:
- chunk = []
- has_para = False
- available_token = parser_page_size
- else:
- # There are excessively long paragraphs present
- # Split paragraph to sentences
- _sentences = re.split(r'\. |。', txt)
- sentences = []
- for s in _sentences:
- token = count_tokens(s)
- if not s.strip() or token == 0:
- continue
- if token <= available_token:
- sentences.append([s, token])
- else:
- # Limit the length of a sentence to chunk size
- token_list = tokenizer.tokenize(s)
- for si in range(0, len(token_list), available_token):
- ss = tokenizer.convert_tokens_to_string(
- token_list[si:min(len(token_list), si + available_token)])
- sentences.append([ss, min(available_token, len(token_list) - si)])
- sent_index = 0
- while sent_index < len(sentences):
- s = sentences[sent_index][0]
- token = sentences[sent_index][1]
- if not chunk:
- chunk.append(f'[page: {str(page_num)}]')
- if token <= available_token or (not has_para):
- # Be sure to add at least one sentence
- # (not has_para) is a patch of the previous sentence splitting
- available_token -= token
- chunk.append([s, page_num])
- has_para = True
- sent_index += 1
- else:
- assert has_para
- if isinstance(chunk[-1], str) and re.fullmatch(r'^\[page: \d+\]$',
- chunk[-1]) is not None:
- chunk.pop() # Redundant page information
- res.append(
- Chunk(content=PARAGRAPH_SPLIT_SYMBOL.join(
- [x if isinstance(x, str) else x[0] for x in chunk]),
- metadata={
- 'source': path,
- 'title': title,
- 'chunk_id': len(res)
- },
- token=parser_page_size - available_token))
- overlap_txt = self._get_last_part(chunk)
- if overlap_txt.strip():
- chunk = [f'[page: {str(chunk[-1][1])}]', overlap_txt]
- has_para = False
- available_token = parser_page_size - count_tokens(overlap_txt)
- else:
- chunk = []
- has_para = False
- available_token = parser_page_size
- # Has split this paragraph by sentence
- idx += 1
- if has_para:
- if isinstance(chunk[-1], str) and re.fullmatch(r'^\[page: \d+\]$', chunk[-1]) is not None:
- chunk.pop() # Redundant page information
- res.append(
- Chunk(content=PARAGRAPH_SPLIT_SYMBOL.join([x if isinstance(x, str) else x[0] for x in chunk]),
- metadata={
- 'source': path,
- 'title': title,
- 'chunk_id': len(res)
- },
- token=parser_page_size - available_token))
- return res
- def _get_last_part(self, chunk: list) -> str:
- overlap = ''
- need_page = chunk[-1][1] # Only need this page to prepend
- available_len = 150
- for i in range(len(chunk) - 1, -1, -1):
- if not (isinstance(chunk[i], list) and len(chunk[i]) == 2):
- continue
- if chunk[i][1] != need_page:
- return overlap
- para = chunk[i][0]
- if len(para) <= available_len:
- if overlap:
- overlap = f'{para}{PARAGRAPH_SPLIT_SYMBOL}{overlap}'
- else:
- overlap = f'{para}'
- available_len -= len(para)
- continue
- sentence_split_symbol = '. '
- if '。' in para:
- sentence_split_symbol = '。'
- sentences = re.split(r'\. |。', para)
- sentences = [sentence.strip() for sentence in sentences if sentence]
- for j in range(len(sentences) - 1, -1, -1):
- sent = sentences[j]
- if not sent.strip():
- continue
- if len(sent) <= available_len:
- if overlap:
- overlap = f'{sent}{sentence_split_symbol}{overlap}'
- else:
- overlap = f'{sent}'
- available_len -= len(sent)
- else:
- return overlap
- return overlap
|