123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435 |
- import json
- import os
- import re
- import time
- from collections import Counter
- from typing import Dict, List, Optional, Union
- import json5
- from qwen_agent.log import logger
- from qwen_agent.settings import DEFAULT_WORKSPACE
- from qwen_agent.tools.base import BaseTool, register_tool
- from qwen_agent.tools.storage import KeyNotExistsError, Storage
- from qwen_agent.utils.str_processing import rm_cid, rm_continuous_placeholders, rm_hexadecimal
- from qwen_agent.utils.tokenization_qwen import count_tokens
- from qwen_agent.utils.utils import (get_file_type, hash_sha256, is_http_url, read_text_from_file,
- sanitize_chrome_file_path, save_url_to_local_work_dir)
- def clean_paragraph(text):
- text = rm_cid(text)
- text = rm_hexadecimal(text)
- text = rm_continuous_placeholders(text)
- return text
- PARAGRAPH_SPLIT_SYMBOL = '\n'
- def parse_word(docx_path: str, extract_image: bool = False):
- if extract_image:
- raise ValueError('Currently, extracting images is not supported!')
- from docx import Document
- doc = Document(docx_path)
- content = []
- for para in doc.paragraphs:
- content.append({'text': para.text})
- for table in doc.tables:
- tbl = []
- for row in table.rows:
- tbl.append('|' + '|'.join([cell.text for cell in row.cells]) + '|')
- tbl = '\n'.join(tbl)
- content.append({'table': tbl})
- # Due to the pages in Word are not fixed, the entire document is returned as one page
- return [{'page_num': 1, 'content': content}]
- def parse_ppt(path: str, extract_image: bool = False):
- if extract_image:
- raise ValueError('Currently, extracting images is not supported!')
- from pptx import Presentation
- ppt = Presentation(path)
- doc = []
- for slide_number, slide in enumerate(ppt.slides):
- page = {'page_num': slide_number + 1, 'content': []}
- for shape in slide.shapes:
- if not shape.has_text_frame and not shape.has_table:
- pass
- if shape.has_text_frame:
- for paragraph in shape.text_frame.paragraphs:
- paragraph_text = ''.join(run.text for run in paragraph.runs)
- paragraph_text = clean_paragraph(paragraph_text)
- if paragraph_text.strip():
- page['content'].append({'text': paragraph_text})
- if shape.has_table:
- tbl = []
- for row_number, row in enumerate(shape.table.rows):
- tbl.append('|' + '|'.join([cell.text for cell in row.cells]) + '|')
- tbl = '\n'.join(tbl)
- page['content'].append({'table': tbl})
- doc.append(page)
- return doc
- def parse_txt(path: str):
- text = read_text_from_file(path)
- paras = text.split(PARAGRAPH_SPLIT_SYMBOL)
- content = []
- for p in paras:
- content.append({'text': p})
- # Due to the pages in txt are not fixed, the entire document is returned as one page
- return [{'page_num': 1, 'content': content}]
- def df_to_md(df) -> str:
- def replace_long_dashes(text):
- if text.replace('-', '').replace(':', '').strip():
- return text
- pattern = r'-{6,}'
- replaced_text = re.sub(pattern, '-----', text)
- return replaced_text
- from tabulate import tabulate
- md_table = tabulate(df, headers='keys', tablefmt='pipe', showindex=False)
- md_table = '\n'.join([
- '|'.join(replace_long_dashes(' ' + cell.strip() + ' ' if cell else '')
- for cell in row.split('|'))
- for row in md_table.split('\n')
- ])
- return md_table
- def parse_excel(file_path: str, extract_image: bool = False) -> List[dict]:
- if extract_image:
- raise ValueError('Currently, extracting images is not supported!')
- import pandas as pd
- excel_file = pd.ExcelFile(file_path)
- md_tables = []
- for sheet_name in excel_file.sheet_names:
- df = pd.read_excel(file_path, sheet_name=sheet_name)
- md_table = df_to_md(df)
- md_tables.append(f'### Sheet: {sheet_name}\n{md_table}')
- return [{'page_num': i + 1, 'content': [{'table': md_tables[i]}]} for i in range(len(md_tables))]
- def parse_csv(file_path: str, extract_image: bool = False) -> List[dict]:
- if extract_image:
- raise ValueError('Currently, extracting images is not supported!')
- import pandas as pd
- md_tables = []
- df = pd.read_csv(file_path, encoding_errors='replace', on_bad_lines='skip')
- md_table = df_to_md(df)
- md_tables.append(md_table) # There is only one table available
- return [{'page_num': i + 1, 'content': [{'table': md_tables[i]}]} for i in range(len(md_tables))]
- def parse_tsv(file_path: str, extract_image: bool = False) -> List[dict]:
- if extract_image:
- raise ValueError('Currently, extracting images is not supported!')
- import pandas as pd
- md_tables = []
- df = pd.read_csv(file_path, sep='\t', encoding_errors='replace', on_bad_lines='skip')
- md_table = df_to_md(df)
- md_tables.append(md_table) # There is only one table available
- return [{'page_num': i + 1, 'content': [{'table': md_tables[i]}]} for i in range(len(md_tables))]
- def parse_html_bs(path: str, extract_image: bool = False):
- if extract_image:
- raise ValueError('Currently, extracting images is not supported!')
- def pre_process_html(s):
- # replace multiple newlines
- s = re.sub('\n+', '\n', s)
- # replace special string
- s = s.replace("Add to Qwen's Reading List", '')
- return s
- try:
- from bs4 import BeautifulSoup
- except Exception:
- raise ValueError('Please install bs4 by `pip install beautifulsoup4`')
- bs_kwargs = {'features': 'lxml'}
- with open(path, 'r', encoding='utf-8') as f:
- soup = BeautifulSoup(f, **bs_kwargs)
- text = soup.get_text()
- if soup.title:
- title = str(soup.title.string)
- else:
- title = ''
- text = pre_process_html(text)
- paras = text.split(PARAGRAPH_SPLIT_SYMBOL)
- content = []
- for p in paras:
- p = clean_paragraph(p)
- if p.strip():
- content.append({'text': p})
- # The entire document is returned as one page
- return [{'page_num': 1, 'content': content, 'title': title}]
- def parse_pdf(pdf_path: str, extract_image: bool = False) -> List[dict]:
- # Todo: header and footer
- from pdfminer.high_level import extract_pages
- from pdfminer.layout import LTImage, LTRect, LTTextContainer
- doc = []
- for i, page_layout in enumerate(extract_pages(pdf_path)):
- page = {'page_num': page_layout.pageid, 'content': []}
- elements = []
- for element in page_layout:
- elements.append(element)
- # Init params for table
- table_num = 0
- tables = []
- for element in elements:
- if isinstance(element, LTRect):
- if not tables:
- tables = extract_tables(pdf_path, i)
- if table_num < len(tables):
- table_string = table_converter(tables[table_num])
- table_num += 1
- if table_string:
- page['content'].append({'table': table_string, 'obj': element})
- elif isinstance(element, LTTextContainer):
- # Delete line breaks in the same paragraph
- text = element.get_text()
- # Todo: Further analysis using font
- font = get_font(element)
- if text.strip():
- new_content_item = {'text': text, 'obj': element}
- if font:
- new_content_item['font-size'] = round(font[1])
- # new_content_item['font-name'] = font[0]
- page['content'].append(new_content_item)
- elif extract_image and isinstance(element, LTImage):
- # Todo: ocr
- raise ValueError('Currently, extracting images is not supported!')
- else:
- pass
- # merge elements
- page['content'] = postprocess_page_content(page['content'])
- doc.append(page)
- return doc
- def postprocess_page_content(page_content: list) -> list:
- # rm repetitive identification for table and text
- # Some documents may repeatedly recognize LTRect and LTTextContainer
- table_obj = [p['obj'] for p in page_content if 'table' in p]
- tmp = []
- for p in page_content:
- repetitive = False
- if 'text' in p:
- for t in table_obj:
- if t.bbox[0] <= p['obj'].bbox[0] and p['obj'].bbox[1] <= t.bbox[1] and t.bbox[2] <= p['obj'].bbox[
- 2] and p['obj'].bbox[3] <= t.bbox[3]:
- repetitive = True
- break
- if not repetitive:
- tmp.append(p)
- page_content = tmp
- # merge paragraphs that have been separated by mistake
- new_page_content = []
- for p in page_content:
- if new_page_content and 'text' in new_page_content[-1] and 'text' in p and abs(
- p.get('font-size', 12) -
- new_page_content[-1].get('font-size', 12)) < 2 and p['obj'].height < p.get('font-size', 12) + 1:
- # Merge those lines belonging to a paragraph
- new_page_content[-1]['text'] += f' {p["text"]}'
- # new_page_content[-1]['font-name'] = p.get('font-name', '')
- new_page_content[-1]['font-size'] = p.get('font-size', 12)
- else:
- p.pop('obj')
- new_page_content.append(p)
- for i in range(len(new_page_content)):
- if 'text' in new_page_content[i]:
- new_page_content[i]['text'] = clean_paragraph(new_page_content[i]['text'])
- return new_page_content
- def get_font(element):
- from pdfminer.layout import LTChar, LTTextContainer
- fonts_list = []
- for text_line in element:
- if isinstance(text_line, LTTextContainer):
- for character in text_line:
- if isinstance(character, LTChar):
- fonts_list.append((character.fontname, character.size))
- fonts_list = list(set(fonts_list))
- if fonts_list:
- counter = Counter(fonts_list)
- most_common_fonts = counter.most_common(1)[0][0]
- return most_common_fonts
- else:
- return []
- def extract_tables(pdf_path, page_num):
- import pdfplumber
- pdf = pdfplumber.open(pdf_path)
- table_page = pdf.pages[page_num]
- tables = table_page.extract_tables()
- return tables
- def table_converter(table):
- table_string = ''
- for row_num in range(len(table)):
- row = table[row_num]
- cleaned_row = [
- item.replace('\n', ' ') if item is not None and '\n' in item else 'None' if item is None else item
- for item in row
- ]
- table_string += ('|' + '|'.join(cleaned_row) + '|' + '\n')
- table_string = table_string[:-1]
- return table_string
- PARSER_SUPPORTED_FILE_TYPES = ['pdf', 'docx', 'pptx', 'txt', 'html', 'csv', 'tsv', 'xlsx', 'xls']
- def get_plain_doc(doc: list):
- paras = []
- for page in doc:
- for para in page['content']:
- for k, v in para.items():
- if k in ['text', 'table', 'image']:
- paras.append(v)
- return PARAGRAPH_SPLIT_SYMBOL.join(paras)
- @register_tool('simple_doc_parser')
- class SimpleDocParser(BaseTool):
- description = f'提取出一个文档的内容,支持类型包括:{"/".join(PARSER_SUPPORTED_FILE_TYPES)}'
- parameters = [{
- 'name': 'url',
- 'type': 'string',
- 'description': '待提取的文件的路径,可以是一个本地路径或可下载的http(s)链接',
- 'required': True
- }]
- def __init__(self, cfg: Optional[Dict] = None):
- super().__init__(cfg)
- self.data_root = self.cfg.get('path', os.path.join(DEFAULT_WORKSPACE, 'tools', self.name))
- self.extract_image = self.cfg.get('extract_image', False)
- self.structured_doc = self.cfg.get('structured_doc', False)
- self.db = Storage({'storage_root_path': self.data_root})
- def call(self, params: Union[str, dict], **kwargs) -> Union[str, list]:
- """Parse pdf by url, and return the formatted content.
- Returns:
- Extracted doc as plain text or the following list format:
- [
- {'page_num': 1,
- 'content': [
- {'text': 'This is one paragraph'},
- {'table': 'This is one table'}
- ],
- 'title': 'If extracted, this is the title of the doc.'},
- {'page_num': 2,
- 'content': [
- {'text': 'This is one paragraph'},
- {'table': 'This is one table'}
- ]}
- ]
- """
- params = self._verify_json_format_args(params)
- path = params['url']
- cached_name_ori = f'{hash_sha256(path)}_ori'
- try:
- # Directly load the parsed doc
- parsed_file = self.db.get(cached_name_ori)
- try:
- parsed_file = json5.loads(parsed_file)
- except ValueError:
- logger.warning(f'Encountered ValueError raised by json5. Fall back to json. File: {cached_name_ori}')
- parsed_file = json.loads(parsed_file)
- logger.info(f'Read parsed {path} from cache.')
- except KeyNotExistsError:
- logger.info(f'Start parsing {path}...')
- time1 = time.time()
- f_type = get_file_type(path)
- if f_type in PARSER_SUPPORTED_FILE_TYPES:
- if path.startswith('https://') or path.startswith('http://') or re.match(
- r'^[A-Za-z]:\\', path) or re.match(r'^[A-Za-z]:/', path):
- path = path
- else:
- path = sanitize_chrome_file_path(path)
- os.makedirs(self.data_root, exist_ok=True)
- if is_http_url(path):
- # download online url
- tmp_file_root = os.path.join(self.data_root, hash_sha256(path))
- os.makedirs(tmp_file_root, exist_ok=True)
- path = save_url_to_local_work_dir(path, tmp_file_root)
- if f_type == 'pdf':
- parsed_file = parse_pdf(path, self.extract_image)
- elif f_type == 'docx':
- parsed_file = parse_word(path, self.extract_image)
- elif f_type == 'pptx':
- parsed_file = parse_ppt(path, self.extract_image)
- elif f_type == 'txt':
- parsed_file = parse_txt(path)
- elif f_type == 'html':
- parsed_file = parse_html_bs(path, self.extract_image)
- elif f_type == 'csv':
- parsed_file = parse_csv(path, self.extract_image)
- elif f_type == 'tsv':
- parsed_file = parse_tsv(path, self.extract_image)
- elif f_type in ['xlsx', 'xls']:
- parsed_file = parse_excel(path, self.extract_image)
- else:
- raise ValueError(
- f'Failed: The current parser does not support this file type! Supported types: {"/".join(PARSER_SUPPORTED_FILE_TYPES)}'
- )
- for page in parsed_file:
- for para in page['content']:
- # Todo: More attribute types
- para['token'] = count_tokens(para.get('text', para.get('table')))
- time2 = time.time()
- logger.info(f'Finished parsing {path}. Time spent: {time2 - time1} seconds.')
- # Cache the parsing doc
- self.db.put(cached_name_ori, json.dumps(parsed_file, ensure_ascii=False, indent=2))
- if not self.structured_doc:
- return get_plain_doc(parsed_file)
- else:
- return parsed_file
|