-
Notifications
You must be signed in to change notification settings - Fork 231
/
parse.py
280 lines (238 loc) · 11.4 KB
/
parse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
import os
import re
from typing import List, Tuple, Optional, Dict
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
import fitz # PyMuPDF
import shapely.geometry as sg
from shapely.geometry.base import BaseGeometry
from shapely.validation import explain_validity
import concurrent.futures
# This Default Prompt Using Chinese and could be changed to other languages.
DEFAULT_PROMPT = """使用markdown语法,将图片中识别到的文字转换为markdown格式输出。你必须做到:
1. 输出和使用识别到的图片的相同的语言,例如,识别到英语的字段,输出的内容必须是英语。
2. 不要解释和输出无关的文字,直接输出图片中的内容。例如,严禁输出 “以下是我根据图片内容生成的markdown文本:”这样的例子,而是应该直接输出markdown。
3. 内容不要包含在```markdown ```中、段落公式使用 $$ $$ 的形式、行内公式使用 $ $ 的形式、忽略掉长直线、忽略掉页码。
再次强调,不要解释和输出无关的文字,直接输出图片中的内容。
"""
DEFAULT_RECT_PROMPT = """图片中用红色框和名称(%s)标注出了一些区域。如果区域是表格或者图片,使用 ![]() 的形式插入到输出内容中,否则直接输出文字内容。
"""
DEFAULT_ROLE_PROMPT = """你是一个PDF文档解析器,使用markdown和latex语法输出图片的内容。
"""
def _is_near(rect1: BaseGeometry, rect2: BaseGeometry, distance: float = 20) -> bool:
"""
Check if two rectangles are near each other if the distance between them is less than the target.
"""
return rect1.buffer(0.1).distance(rect2.buffer(0.1)) < distance
def _is_horizontal_near(rect1: BaseGeometry, rect2: BaseGeometry, distance: float = 100) -> bool:
"""
Check if two rectangles are near horizontally if one of them is a horizontal line.
"""
result = False
if abs(rect1.bounds[3] - rect1.bounds[1]) < 0.1 or abs(rect2.bounds[3] - rect2.bounds[1]) < 0.1:
if abs(rect1.bounds[0] - rect2.bounds[0]) < 0.1 and abs(rect1.bounds[2] - rect2.bounds[2]) < 0.1:
result = abs(rect1.bounds[3] - rect2.bounds[3]) < distance
return result
def _union_rects(rect1: BaseGeometry, rect2: BaseGeometry) -> BaseGeometry:
"""
Union two rectangles.
"""
return sg.box(*(rect1.union(rect2).bounds))
def _merge_rects(rect_list: List[BaseGeometry], distance: float = 20, horizontal_distance: Optional[float] = None) -> \
List[BaseGeometry]:
"""
Merge rectangles in the list if the distance between them is less than the target.
"""
merged = True
while merged:
merged = False
new_rect_list = []
while rect_list:
rect = rect_list.pop(0)
for other_rect in rect_list:
if _is_near(rect, other_rect, distance) or (
horizontal_distance and _is_horizontal_near(rect, other_rect, horizontal_distance)):
rect = _union_rects(rect, other_rect)
rect_list.remove(other_rect)
merged = True
new_rect_list.append(rect)
rect_list = new_rect_list
return rect_list
def _adsorb_rects_to_rects(source_rects: List[BaseGeometry], target_rects: List[BaseGeometry], distance: float = 10) -> \
Tuple[List[BaseGeometry], List[BaseGeometry]]:
"""
Adsorb a set of rectangles to another set of rectangles.
"""
new_source_rects = []
for text_area_rect in source_rects:
adsorbed = False
for index, rect in enumerate(target_rects):
if _is_near(text_area_rect, rect, distance):
rect = _union_rects(text_area_rect, rect)
target_rects[index] = rect
adsorbed = True
break
if not adsorbed:
new_source_rects.append(text_area_rect)
return new_source_rects, target_rects
def _parse_rects(page: fitz.Page) -> List[Tuple[float, float, float, float]]:
"""
Parse drawings in the page and merge adjacent rectangles.
"""
# 提取画的内容
drawings = page.get_drawings()
# 忽略掉长度小于30的水平直线
is_short_line = lambda x: abs(x['rect'][3] - x['rect'][1]) < 1 and abs(x['rect'][2] - x['rect'][0]) < 30
drawings = [drawing for drawing in drawings if not is_short_line(drawing)]
# 转换为shapely的矩形
rect_list = [sg.box(*drawing['rect']) for drawing in drawings]
# 提取图片区域
images = page.get_image_info()
image_rects = [sg.box(*image['bbox']) for image in images]
# 合并drawings和images
rect_list += image_rects
merged_rects = _merge_rects(rect_list, distance=10, horizontal_distance=100)
merged_rects = [rect for rect in merged_rects if explain_validity(rect) == 'Valid Geometry']
# 将大文本区域和小文本区域分开处理: 大文本相小合并,小文本靠近合并
is_large_content = lambda x: (len(x[4]) / max(1, len(x[4].split('\n')))) > 5
small_text_area_rects = [sg.box(*x[:4]) for x in page.get_text('blocks') if not is_large_content(x)]
large_text_area_rects = [sg.box(*x[:4]) for x in page.get_text('blocks') if is_large_content(x)]
_, merged_rects = _adsorb_rects_to_rects(large_text_area_rects, merged_rects, distance=0.1) # 完全相交
_, merged_rects = _adsorb_rects_to_rects(small_text_area_rects, merged_rects, distance=5) # 靠近
# 再次自身合并
merged_rects = _merge_rects(merged_rects, distance=10)
# 过滤比较小的矩形
merged_rects = [rect for rect in merged_rects if rect.bounds[2] - rect.bounds[0] > 20 and rect.bounds[3] - rect.bounds[1] > 20]
return [rect.bounds for rect in merged_rects]
def _parse_pdf_to_images(pdf_path: str, output_dir: str = './') -> List[Tuple[str, List[str]]]:
"""
Parse PDF to images and save to output_dir.
"""
# 打开PDF文件
pdf_document = fitz.open(pdf_path)
image_infos = []
for page_index, page in enumerate(pdf_document):
logging.info(f'parse page: {page_index}')
rect_images = []
rects = _parse_rects(page)
for index, rect in enumerate(rects):
fitz_rect = fitz.Rect(rect)
# 保存页面为图片
pix = page.get_pixmap(clip=fitz_rect, matrix=fitz.Matrix(4, 4))
name = f'{page_index}_{index}.png'
pix.save(os.path.join(output_dir, name))
rect_images.append(name)
# # 在页面上绘制红色矩形
big_fitz_rect = fitz.Rect(fitz_rect.x0 - 1, fitz_rect.y0 - 1, fitz_rect.x1 + 1, fitz_rect.y1 + 1)
# 空心矩形
page.draw_rect(big_fitz_rect, color=(1, 0, 0), width=1)
# 画矩形区域(实心)
# page.draw_rect(big_fitz_rect, color=(1, 0, 0), fill=(1, 0, 0))
# 在矩形内的左上角写上矩形的索引name,添加一些偏移量
text_x = fitz_rect.x0 + 2
text_y = fitz_rect.y0 + 10
text_rect = fitz.Rect(text_x, text_y - 9, text_x + 80, text_y + 2)
# 绘制白色背景矩形
page.draw_rect(text_rect, color=(1, 1, 1), fill=(1, 1, 1))
# 插入带有白色背景的文字
page.insert_text((text_x, text_y), name, fontsize=10, color=(1, 0, 0))
page_image_with_rects = page.get_pixmap(matrix=fitz.Matrix(3, 3))
page_image = os.path.join(output_dir, f'{page_index}.png')
page_image_with_rects.save(page_image)
image_infos.append((page_image, rect_images))
pdf_document.close()
return image_infos
def _gpt_parse_images(
image_infos: List[Tuple[str, List[str]]],
prompt_dict: Optional[Dict] = None,
output_dir: str = './',
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = 'gpt-4o',
verbose: bool = False,
gpt_worker: int = 1,
**args
) -> str:
"""
Parse images to markdown content.
"""
from GeneralAgent import Agent
if isinstance(prompt_dict, dict) and 'prompt' in prompt_dict:
prompt = prompt_dict['prompt']
logging.info("prompt is provided, using user prompt.")
else:
prompt = DEFAULT_PROMPT
logging.info("prompt is not provided, using default prompt.")
if isinstance(prompt_dict, dict) and 'rect_prompt' in prompt_dict:
rect_prompt = prompt_dict['rect_prompt']
logging.info("rect_prompt is provided, using user prompt.")
else:
rect_prompt = DEFAULT_RECT_PROMPT
logging.info("rect_prompt is not provided, using default prompt.")
if isinstance(prompt_dict, dict) and 'role_prompt' in prompt_dict:
role_prompt = prompt_dict['role_prompt']
logging.info("role_prompt is provided, using user prompt.")
else:
role_prompt = DEFAULT_ROLE_PROMPT
logging.info("role_prompt is not provided, using default prompt.")
def _process_page(index: int, image_info: Tuple[str, List[str]]) -> Tuple[int, str]:
logging.info(f'gpt parse page: {index}')
agent = Agent(role=role_prompt, api_key=api_key, base_url=base_url, disable_python_run=True, model=model, **args)
page_image, rect_images = image_info
local_prompt = prompt
if rect_images:
local_prompt += rect_prompt + ', '.join(rect_images)
content = agent.run([local_prompt, {'image': page_image}], display=verbose)
return index, content
contents = [None] * len(image_infos)
with concurrent.futures.ThreadPoolExecutor(max_workers=gpt_worker) as executor:
futures = [executor.submit(_process_page, index, image_info) for index, image_info in enumerate(image_infos)]
for future in concurrent.futures.as_completed(futures):
index, content = future.result()
# 在某些情况下大模型还是会输出 ```markdown ```字符串
if '```markdown' in content:
content = content.replace('```markdown\n', '')
last_backticks_pos = content.rfind('```')
if last_backticks_pos != -1:
content = content[:last_backticks_pos] + content[last_backticks_pos + 3:]
contents[index] = content
output_path = os.path.join(output_dir, 'output.md')
with open(output_path, 'w', encoding='utf-8') as f:
f.write('\n\n'.join(contents))
return '\n\n'.join(contents)
def parse_pdf(
pdf_path: str,
output_dir: str = './',
prompt: Optional[Dict] = None,
api_key: Optional[str] = None,
base_url: Optional[str] = None,
model: str = 'gpt-4o',
verbose: bool = False,
gpt_worker: int = 1,
**args
) -> Tuple[str, List[str]]:
"""
Parse a PDF file to a markdown file.
"""
if not os.path.exists(output_dir):
os.makedirs(output_dir)
image_infos = _parse_pdf_to_images(pdf_path, output_dir=output_dir)
content = _gpt_parse_images(
image_infos=image_infos,
output_dir=output_dir,
prompt_dict=prompt,
api_key=api_key,
base_url=base_url,
model=model,
verbose=verbose,
gpt_worker=gpt_worker,
**args
)
all_rect_images = []
# remove all rect images
if not verbose:
for page_image, rect_images in image_infos:
if os.path.exists(page_image):
os.remove(page_image)
all_rect_images.extend(rect_images)
return content, all_rect_images