|
11 | 11 | import logging
|
12 | 12 | import re
|
13 | 13 | import warnings
|
| 14 | +from abc import ABC, abstractmethod |
14 | 15 | from collections import defaultdict
|
15 | 16 | from collections.abc import Callable
|
16 | 17 | from functools import partial
|
17 | 18 | from io import BytesIO
|
18 | 19 | from typing import TYPE_CHECKING, Iterable, Iterator, Literal, TypeAlias, get_args
|
19 | 20 |
|
| 21 | +import numpy as np |
| 22 | +from pdf2image import convert_from_bytes |
20 | 23 | from PIL import Image
|
21 | 24 | from pydantic import BaseModel
|
| 25 | +from unstructured.file_utils.filetype import FileType, detect_filetype |
22 | 26 |
|
23 | 27 | import pathway as pw
|
24 | 28 | from pathway.internals import udfs
|
25 | 29 | from pathway.internals.config import _check_entitlements
|
26 | 30 | from pathway.optional_import import optional_imports
|
27 |
| -from pathway.xpacks.llm import llms, prompts |
| 31 | +from pathway.xpacks.llm import _parser_utils, llms, prompts |
28 | 32 | from pathway.xpacks.llm._utils import _prepare_executor
|
29 | 33 | from pathway.xpacks.llm.constants import DEFAULT_VISION_MODEL
|
30 | 34 |
|
31 | 35 | if TYPE_CHECKING:
|
32 | 36 | with optional_imports("xpack-llm-docs"):
|
| 37 | + from paddleocr import PaddleOCR, PPStructureV3 |
33 | 38 | from unstructured.documents.elements import Element
|
34 | 39 |
|
35 | 40 | logger = logging.getLogger(__name__)
|
@@ -1091,3 +1096,197 @@ def replace_newline(match: re.Match):
|
1091 | 1096 |
|
1092 | 1097 | modified_text = re.sub(r"\n(\w)", replace_newline, text)
|
1093 | 1098 | return modified_text
|
| 1099 | + |
| 1100 | + |
| 1101 | +class _PaddleParser(ABC): |
| 1102 | + """ |
| 1103 | + Abstract wrapper for Paddle pipeline, that extracts text from OCR results. |
| 1104 | + """ |
| 1105 | + |
| 1106 | + pipeline: PaddleOCR | PPStructureV3 |
| 1107 | + |
| 1108 | + def __init__(self, pipeline: PaddleOCR | PPStructureV3): |
| 1109 | + self.pipeline = pipeline |
| 1110 | + |
| 1111 | + def parse(self, image: np.ndarray) -> str: |
| 1112 | + ocr_result = self.pipeline.predict(image) |
| 1113 | + return self.extract_text(ocr_result) |
| 1114 | + |
| 1115 | + @abstractmethod |
| 1116 | + def extract_text(self, ocr_result: list) -> str: |
| 1117 | + pass |
| 1118 | + |
| 1119 | + @staticmethod |
| 1120 | + def create_for(pipeline: PaddleOCR | PPStructureV3) -> _PaddleParser: |
| 1121 | + with optional_imports("xpack-llm-docs"): |
| 1122 | + from paddleocr import PaddleOCR, PPStructureV3 |
| 1123 | + |
| 1124 | + match pipeline: |
| 1125 | + case PPStructureV3(): |
| 1126 | + return _PaddlePPStructureV3Parser(pipeline) |
| 1127 | + case PaddleOCR(): |
| 1128 | + return _PaddleOCRParser(pipeline) |
| 1129 | + case _: |
| 1130 | + raise NotImplementedError( |
| 1131 | + f"Extractor for {type(pipeline)} is not implemented." |
| 1132 | + ) |
| 1133 | + |
| 1134 | + |
| 1135 | +class _PaddlePPStructureV3Parser(_PaddleParser): |
| 1136 | + def extract_text(self, ocr_result: list) -> str: |
| 1137 | + pages = [] |
| 1138 | + |
| 1139 | + for res in ocr_result: |
| 1140 | + try: |
| 1141 | + pages.append(res.markdown) |
| 1142 | + except AttributeError: |
| 1143 | + logger.error("Failed to extract text from OCR result.") |
| 1144 | + continue |
| 1145 | + |
| 1146 | + result = self.pipeline.concatenate_markdown_pages(pages) |
| 1147 | + |
| 1148 | + return result |
| 1149 | + |
| 1150 | + |
| 1151 | +class _PaddleOCRParser(_PaddleParser): |
| 1152 | + def extract_text(self, ocr_result: list) -> str: |
| 1153 | + result = "" |
| 1154 | + for res in ocr_result: |
| 1155 | + try: |
| 1156 | + text = res["rec_texts"] |
| 1157 | + result += " ".join(text) + "\n\n" |
| 1158 | + except KeyError: |
| 1159 | + logger.error("Failed to extract text from OCR result.") |
| 1160 | + continue |
| 1161 | + return result |
| 1162 | + |
| 1163 | + |
| 1164 | +class PaddleOCRParser(pw.UDF): |
| 1165 | + """ |
| 1166 | + A class to parse images, PDFs and PPTX slides using PaddleOCR. |
| 1167 | +
|
| 1168 | + Args: |
| 1169 | + pipeline: A Paddle pipeline object. Currently PaddleOCR and PPStructureV3 are supported. |
| 1170 | + If not provided, a default PPStructureV3 pipeline will be used. |
| 1171 | + Use PPStructureV3 for better accuracy on documents with complex layouts. PaddleOCR can be used for |
| 1172 | + simpler documents, extracting only text but may be faster. |
| 1173 | + concatenate_pages: Whether to concatenate multi-paged documents into a single output. Defaults to False. |
| 1174 | + intermediate_image_format: Intermediate image format used when converting PDFs to images. |
| 1175 | + Defaults to ``"jpg"`` for speed and memory use. |
| 1176 | + max_image_size: Maximum allowed size of the images in bytes. Default is 15 MB. |
| 1177 | + downsize_horizontal_width: Width to which images are downsized if necessary. |
| 1178 | + Default is 1920. |
| 1179 | + cache_strategy: Defines the caching mechanism. To enable caching, |
| 1180 | + a valid :py:class:``~pathway.udfs.CacheStrategy`` should be provided. |
| 1181 | + Defaults to None. |
| 1182 | + async_mode: Mode of execution for the UDF, either ``"batch_async"`` or ``"fully_async"``. |
| 1183 | + Default is ``"batch_async"``. |
| 1184 | + """ |
| 1185 | + |
| 1186 | + parser: _PaddleParser |
| 1187 | + intermediate_image_format: str |
| 1188 | + max_image_size: int |
| 1189 | + downsize_horizontal_width: int |
| 1190 | + |
| 1191 | + def __init__( |
| 1192 | + self, |
| 1193 | + pipeline: PaddleOCR | PPStructureV3 | None = None, |
| 1194 | + *, |
| 1195 | + concatenate_pages: bool = False, |
| 1196 | + intermediate_image_format: str = "jpg", |
| 1197 | + max_image_size: int = 15 * 1024 * 1024, |
| 1198 | + downsize_horizontal_width: int = 1920, |
| 1199 | + cache_strategy: udfs.CacheStrategy | None = None, |
| 1200 | + async_mode: Literal["batch_async", "fully_async"] = "batch_async", |
| 1201 | + ): |
| 1202 | + super().__init__( |
| 1203 | + executor=_prepare_executor(async_mode=async_mode), |
| 1204 | + cache_strategy=cache_strategy, |
| 1205 | + ) |
| 1206 | + |
| 1207 | + with optional_imports("xpack-llm-docs"): |
| 1208 | + import paddleocr # noqa:F401 |
| 1209 | + |
| 1210 | + self.intermediate_image_format = intermediate_image_format |
| 1211 | + self.max_image_size = max_image_size |
| 1212 | + self.downsize_horizontal_width = downsize_horizontal_width |
| 1213 | + self.concatenate_pages = concatenate_pages |
| 1214 | + |
| 1215 | + if pipeline is None: |
| 1216 | + pipeline = self._default_pipeline() |
| 1217 | + |
| 1218 | + self.parser = _PaddleParser.create_for(pipeline) |
| 1219 | + |
| 1220 | + def _default_pipeline(self) -> PPStructureV3: |
| 1221 | + with optional_imports("xpack-llm-docs"): |
| 1222 | + from paddleocr import PPStructureV3 |
| 1223 | + return PPStructureV3( |
| 1224 | + use_table_recognition=False, |
| 1225 | + use_doc_orientation_classify=False, |
| 1226 | + use_doc_unwarping=False, |
| 1227 | + use_textline_orientation=False, |
| 1228 | + use_seal_recognition=False, |
| 1229 | + use_formula_recognition=False, |
| 1230 | + use_chart_recognition=False, |
| 1231 | + use_region_detection=False, |
| 1232 | + ) |
| 1233 | + |
| 1234 | + def _normalize_input( |
| 1235 | + self, |
| 1236 | + contents: bytes, |
| 1237 | + ) -> tuple[list[Image.Image], FileType | None]: |
| 1238 | + byte_file = io.BytesIO(contents) |
| 1239 | + filetype = detect_filetype(file=byte_file) |
| 1240 | + |
| 1241 | + match filetype: |
| 1242 | + case FileType.PPT | FileType.PPTX: |
| 1243 | + contents = _parser_utils._convert_pptx_to_pdf(contents) |
| 1244 | + images = convert_from_bytes( |
| 1245 | + contents, fmt=self.intermediate_image_format |
| 1246 | + ) |
| 1247 | + case FileType.PDF: |
| 1248 | + images = convert_from_bytes( |
| 1249 | + contents, fmt=self.intermediate_image_format |
| 1250 | + ) |
| 1251 | + case _ as filetype: |
| 1252 | + try: |
| 1253 | + images = [Image.open(io.BytesIO(contents)).convert("RGB")] |
| 1254 | + except Exception as e: |
| 1255 | + logger.error(f"Failed to parse provided file. Reason: {e}") |
| 1256 | + return [], None |
| 1257 | + |
| 1258 | + images = [ |
| 1259 | + _parser_utils.maybe_downscale( |
| 1260 | + img, |
| 1261 | + max_image_size=self.max_image_size, |
| 1262 | + downsize_horizontal_width=self.downsize_horizontal_width, |
| 1263 | + ) |
| 1264 | + for img in images |
| 1265 | + ] |
| 1266 | + |
| 1267 | + return images, filetype |
| 1268 | + |
| 1269 | + async def __wrapped__(self, contents: bytes) -> list[tuple[str, dict]]: |
| 1270 | + images, original_filetype = self._normalize_input(contents) |
| 1271 | + |
| 1272 | + def metadata(page_number: int) -> dict: |
| 1273 | + if original_filetype in [FileType.PPT, FileType.PPTX, FileType.PDF]: |
| 1274 | + return {"page_number": page_number} |
| 1275 | + return {} |
| 1276 | + |
| 1277 | + docs = [] |
| 1278 | + |
| 1279 | + for i, image in enumerate(images): |
| 1280 | + try: |
| 1281 | + img_np = np.array(image) |
| 1282 | + text = self.parser.parse(img_np) |
| 1283 | + docs.append((text, metadata(i))) |
| 1284 | + except Exception as e: |
| 1285 | + logger.error(f"Failed to process an image. Reason: {e}") |
| 1286 | + continue |
| 1287 | + |
| 1288 | + if self.concatenate_pages and len(docs) > 1: |
| 1289 | + concatenated_text = "\n\n".join([doc[0] for doc in docs]) |
| 1290 | + docs = [(concatenated_text, {"page_number": 0})] |
| 1291 | + |
| 1292 | + return docs |
0 commit comments