Created
January 8, 2020 16:15
-
-
Save urigoren/a7288d4a6d60fcfd537267f3b1c7603c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pdfplumber | |
import itertools, collections, sys, os, re, json | |
from pprint import pprint as pr | |
from copy import deepcopy | |
from operator import itemgetter as at | |
class CartesianText: | |
__slots__ = ["text", "x0", "x1", "y0", "y1", "page_height"] | |
def __init__(self, d=None, page_height=792.0): | |
self.page_height = page_height | |
if isinstance(d, dict): | |
self.x0, self.x1, self.y0, self.y1 = map(float, at("x0", "x1", "y0", "y1")(d)) | |
page_offset = int(d["page_number"] - 1) * float(page_height) | |
self.y0 += page_offset | |
self.y1 += page_offset | |
self.text = d["text"] if d["text"] and ord(d["text"]) < 128 else "" | |
def __repr__(self): | |
try: | |
return "({x0:0.2f},{y0:0.2f})-({x1:0.2f},{y1:0.2f}): '{txt}'".format(txt=self.text, x0=self.x0, x1=self.x1, | |
y0=self.y0, y1=self.y1) | |
except AttributeError: | |
return "Undefined" | |
def __add__(self, other): | |
if isinstance(other, str): | |
self.text += other | |
return self | |
ret = CartesianText(None, self.page_height) | |
if not other or not other.text: | |
return self | |
if isinstance(other, dict): | |
other = CartesianText(other) | |
ret.x0 = min(self.x0, other.x0) | |
ret.y0 = min(self.y0, other.y0) | |
ret.x1 = max(self.x1, other.x1) | |
ret.y1 = max(self.y1, other.y1) | |
ret.text = self.text + other.text | |
return ret | |
def __radd__(self, other): | |
if isinstance(other, str): | |
self.text = other + self.text | |
return self | |
ret = CartesianText(None, self.page_height) | |
if not other or not other.text: | |
return self | |
if isinstance(other, dict): | |
other = CartesianText(other) | |
ret.x0 = min(self.x0, other.x0) | |
ret.y0 = min(self.y0, other.y0) | |
ret.x1 = max(self.x1, other.x1) | |
ret.y1 = max(self.y1, other.y1) | |
ret.text = other.text + self.text | |
return ret | |
def to_dict(self): | |
return { | |
"text": self.text, | |
"x0": self.x0, | |
"x1": self.x1, | |
"y0": self.y0, | |
"y1": self.y1, | |
} | |
def to_tuple(self): | |
return ( | |
round(self.x0, 3), | |
round(self.x1, 3), | |
round(self.y0, 3), | |
round(self.y1, 3), | |
self.text | |
) | |
def calc_line_stats(lines): | |
ret = [] | |
line_stats = [] | |
for line in lines: | |
next_x0 = None | |
ret_line = [] | |
x_diffs = [] | |
xs = [] | |
for d in reversed(line): | |
if ord(d["text"]) >= 128: | |
continue | |
x0, x1, y0 = map(float, (d["x0"], d["x1"], d["y0"])) | |
d["next_x0"] = next_x0 | |
ret_line.insert(0, d) | |
xs.extend([x0, x1]) | |
if next_x0: | |
x_diffs.append(next_x0 - x1) | |
next_x0 = x0 | |
if not any(xs): | |
continue | |
ret.append(ret_line) | |
line_stats.append({ | |
"y0": y0, | |
"char_diff_median": np.median(x_diffs), | |
"char_diff_std": np.std(x_diffs), | |
"char_diff_min": np.min(x_diffs) if any(x_diffs) else 0, | |
"char_diff_max": np.max(x_diffs) if any(x_diffs) else 0, | |
"min_x": np.min(xs), | |
"max_x": np.max(xs), | |
"len_x": len(xs), | |
}) | |
return ret, line_stats | |
def line_median_space_split(lines, line_stats, median_coef=0.7, min_space=2.5, min_length=5): | |
ret = [] | |
for line, stats in zip(lines, line_stats): | |
ret_line = [] | |
for d in line: | |
ret_line.append(d) | |
if d["next_x0"] and (len(d["text"])>min_length) and ((d["next_x0"] - float(d["x1"])) > | |
max(median_coef * stats["char_diff_median"], min_space*stats["char_diff_min"])) : | |
ret.append(ret_line) | |
ret_line = [] | |
if not any(ret_line): | |
continue | |
ret.append(ret_line) | |
return ret | |
def repair_line_breaks(lines): | |
y0_dict = collections.defaultdict(list) | |
for line in lines: | |
y0_dict[line.y0].append(line) | |
ys = sorted(y0_dict.keys()) | |
median_y_diff = np.median([b - a for a, b in zip(ys, ys[1:])]) | |
orphaned_lines = [lns[0] for y0, lns in y0_dict.items() if len(lns) == 1] | |
last = lambda lst: lst[-1] if any(lst) else None | |
orphaned_lines = [(line, last([prev for prev in lines if prev.x0 == line.x0 and prev.y1 < line.y0])) for line in | |
orphaned_lines] | |
return [pre + "\n" + post for post, pre in orphaned_lines if post and pre and pre.x1 - post.x0 < median_y_diff] | |
def parse(pdf_file, output=dict): | |
with pdfplumber.open(pdf_file) as pdf: | |
page_height = float(pdf.pages[0].height) | |
lines = [list(v) for k, v in itertools.groupby(pdf.chars, at("y0"))] | |
lines, line_stats = calc_line_stats(lines) | |
lines = line_median_space_split(lines, line_stats) | |
lines = [sum((CartesianText(t, page_height) for t in line)) for line in lines] | |
# lines = [sum((CartesianText(t, page_height) for t in line)) for line in lines] | |
# lines = repair_line_breaks(lines, line_stats) | |
if output==tuple: | |
lines = [l.to_tuple() for l in lines] | |
elif output==dict: | |
lines = [l.to_dict() for l in lines] | |
return lines | |
if __name__ == "__main__": | |
in_file = sys.argv[1] if len(sys.argv) > 1 else None | |
out_file = sys.argv[2] if len(sys.argv) > 2 else None | |
if not in_file: | |
in_file = "/Users/urigoren/Downloads/document.pdf" | |
lines = parse(in_file, tuple) | |
if out_file: | |
with open(out_file, 'w') as f: | |
f.write("x0,x1,y0,y1,text\n") | |
for line in lines: | |
f.write('{x0},{x1},{y0},{y1},"{text}"\n'.format( | |
x0=line[0],x1=line[1],y0=line[2],y1=line[3], | |
text=line[4].replace('"', '""') | |
)) | |
else: | |
pr(lines) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment