Skip to content

Instantly share code, notes, and snippets.

@andsilver
Last active September 8, 2023 02:15
Show Gist options
  • Save andsilver/410a32d88edcbfb2d0aaf30a13b09909 to your computer and use it in GitHub Desktop.
Save andsilver/410a32d88edcbfb2d0aaf30a13b09909 to your computer and use it in GitHub Desktop.
Parse a PDF doc using doctr, find legal description with bounding boxes information using Pandas
from doctr.io import DocumentFile
from doctr.models import ocr_predictor
import pandas as pd
import numpy as np
from fuzzywuzzy import fuzz
from pdb import set_trace
import json
import sys
model = ocr_predictor(det_arch='db_resnet50',
reco_arch='crnn_vgg16_bn', pretrained=True)
class ParseReport:
def __init__(self, path: str):
if path.endswith('.pdf'):
data = DocumentFile.from_pdf(file=path)
else:
data = DocumentFile.from_images(files=[path])
pd.set_option("display.precision", 6)
# df = pd.read_json('./example.json')
result = model(data)
json_output = result.export()
with open('./example.json', "w") as f:
json.dump(json_output, f, indent=2)
df = pd.DataFrame(json_output)
pages = df.join(pd.json_normalize(df.pop('pages')))
blocks = pages.explode("blocks")
blocks['block_idx'] = np.arange(blocks.shape[0])
blocks['index'] = blocks['block_idx']
blocks = blocks.set_index('index')
blocks = blocks.join(pd.json_normalize(blocks.pop('blocks')))
blocks = blocks.rename(columns={'geometry': 'block_geometry'})
lines = blocks.explode("lines")
lines['line_idx'] = np.arange(lines.shape[0])
lines['index'] = np.arange(lines.shape[0])
lines = lines.set_index('index')
lines = lines.join(pd.json_normalize(lines.pop('lines')))
lines = lines.rename(columns={'geometry': 'line_geometry'})
save_lines = lines.copy()
save_lines["x1"] = save_lines['words'].apply(
lambda x: x[0]['geometry'][0][0])
save_lines["x2"] = save_lines['words'].apply(
lambda x: x[0]['geometry'][1][0])
save_lines["y1"] = save_lines['words'].apply(
lambda x: x[0]['geometry'][0][1])
save_lines["y2"] = save_lines['words'].apply(
lambda x: x[0]['geometry'][1][1])
save_lines["y1"] += save_lines["page_idx"]
save_lines["y2"] += save_lines["page_idx"]
save_lines['words'] = save_lines['words'].apply(
lambda x: ' '.join(list(map(lambda item: item['value'], x))))
# save_lines["line_geometry"] = save_lines.line_geometry.apply(lambda x: {"x1": x[0][0], "y1": x[0][1], "x2": x[1][0], "y2": x[1][1]})
save_lines = save_lines.rename(columns={'words': 'value'})
save_lines = save_lines.join(
pd.json_normalize(save_lines.pop('line_geometry')))
self.lines = save_lines
words = lines.explode("words")
words['word_idx'] = np.arange(words.shape[0])
words['index'] = np.arange(words.shape[0])
words = words.set_index('index')
words = words.join(pd.json_normalize(words.pop('words')))
words = words.rename(columns={'geometry': 'word_geometry'})
words["word_geometry"] = words.word_geometry.apply(
lambda x: {"x1": x[0][0], "y1": x[0][1], "x2": x[1][0], "y2": x[1][1]})
self.words = words.join(pd.json_normalize(words.pop('word_geometry')))
def find_attribute_one(self, word, extract_value=True, context=None):
"""
Finds attributes values with one word.
:param word: A word to match in any line of the document.
:param extract_value: if True returns only the value else returns the whole line.
:param context: if there are any specific context to look at, to shorten the search space.
:return: a string of value for an attribute.
"""
if context is None:
context = self.words
try:
line = context[context['value'].str.contains(
word)][["page_idx", "block_idx", "line_idx", "value"]].values.squeeze()
line = context[(context['page_idx'] == line[0]) & (
context['block_idx'] == line[1]) & (context['line_idx'] == line[2])]["value"].values
if extract_value:
return self.extract_value(" ".join(line))
return " ".join(line)
except Exception as e:
return None
def find_attribute_two(self, word1, word2, extract_value=True, context=None):
"""
Finds attributes values with two words.
:param word1: first word of two to match with any line of the document
:param word2: second word of two to match with any line of the document
:param extract_value: if True returns only the value else returns the whole line.
:param context: if there are any specific context to look at, to shorten the search space.
:return: a string of value for an attribute.
"""
if context is None:
context = self.words
try:
line = context[context['value'].str.lower().str.contains(word1.lower()) | context['value'].str.lower().str.contains(word2.lower())][
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]]
line['paired'] = line.value + " " + line.value.shift(-1)
line['space'] = abs(line.x1.shift(-1) - line.x2)
line['align'] = (abs(line.y1.shift(-1) - line.y1) +
abs(line.y2.shift(-1) - line.y2)) / 2
line["score"] = line["paired"].apply(
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2)))
line = line[line.score < 10]
line = line.sort_values(by=['score', 'space', "align"])
line = line.iloc[0][["page_idx",
"block_idx", "line_idx", "paired"]]
line = context[(context['page_idx'] == line.page_idx.item()) & (context['block_idx'] == line.block_idx.item()) & (
context['line_idx'] == line.line_idx.item())]["value"].values
if extract_value:
return self.extract_value(" ".join(line))
return " ".join(line)
except Exception as e:
return None
def company_name(self):
"""
First line of the document, and assign as a company name.
:return: string (company name)
"""
return " ".join(self.words[(self.words['page_idx'] == 0) & (self.words['block_idx'] == 0) & (self.words['line_idx'] == 0)]["value"].values)
@staticmethod
def extract_value(sentence):
"""
Separates a string with key value combination, and returns value.
:param sentence: a sentence to be split into key and value
:return: a string of value that was split
"""
return sentence.split(":")[-1].strip()
def get_blocks(self, keywords):
context = self.lines.sort_values(['page_idx', 'y1'])
context['paragraph_idx'] = 0
context['prev_m'] = 0
context['next_m'] = 0
paragraph_idx = 1
for i in range(0, context.shape[0]):
current_row = context.iloc[i]
if i == 0:
prev_m = 0
else:
prev_row = context.iloc[i - 1]
prev_m = current_row['y1'] - \
prev_row['y2'] if current_row['y1'] > prev_row['y2'] else 0
if i == context.shape[0] - 1:
next_m = 0
else:
next_row = context.iloc[i + 1]
next_m = next_row['y1'] - \
current_row['y2'] if next_row['y1'] > current_row['y2'] else 0
diff = round(abs(prev_m - next_m), 2)
if diff != 0 and prev_m > next_m:
paragraph_idx = paragraph_idx + 1
elif round(abs(current_row['x1'] - context.iloc[i - 1]['x1']), 1) > 0.3:
paragraph_idx = paragraph_idx + 1
context.prev_m.iloc[i] = prev_m
context.next_m.iloc[i] = prev_m
context.paragraph_idx.iloc[i] = paragraph_idx
context.to_csv('./test.csv')
block = context[context['value'].str.lower().str.contains(
'|'.join(keywords))][["page_idx", "block_idx", "paragraph_idx", "value"]]
blocks = (list(set(block['block_idx'].to_list())))
paragraphs = (list(set(block['paragraph_idx'].to_list())))
if not len(blocks) or not len(paragraphs):
return []
words = context[context["block_idx"].isin(blocks) | context['paragraph_idx'].isin(paragraphs)][[
"page_idx", "block_idx", 'paragraph_idx', 'line_idx', "value"]]
df = pd.DataFrame()
df['text'] = words.groupby(['paragraph_idx'])['value'].transform(
lambda x: ' '.join(x)).drop_duplicates().tolist()
df['paragraph'] = words.groupby(['paragraph_idx'])[
'paragraph_idx'].min().tolist()
df['block'] = words.groupby(['paragraph_idx'])[
'block_idx'].min().tolist()
records = df.to_dict('records')
return records
def get_lines(self, word1, word2, context=None):
"""
Finds lines containing two words from the whole document.
:param word1: first word of two to match with any line of the document
:param word2: second word of two to match with any line of the document
:param context: if there are any specific context to look at, to shorten the search space.
:return: a pd.Dataframe with high score of being the lines we are looking for.
"""
if context is None:
context = self.words
line = context[context['value'].str.contains(word1) | context['value'].str.contains(word2)][
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]]
line['paired'] = line.value + " " + line.value.shift(-1)
line['space'] = abs(line.x1.shift(-1) - line.x2)
line['align'] = (abs(line.y1.shift(-1) - line.y1) +
abs(line.y2.shift(-1) - line.y2)) / 2
line["score"] = line["paired"].apply(
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2)))
line = line[line.score < 10]
line = line.sort_values(by=['score', 'space', "align"])
return line
def get_lien(self):
"""
Finds information about 'Lien Types'.
:return: information about lien types
"""
line = self.get_lines("Lien", "Type:")
lien_list = []
for i, row in line.iterrows():
lien = self.words[(self.words.page_idx == row.page_idx) & (
self.words.block_idx == row.block_idx)]
lien_dict = {
"Lien Type:": self.find_attribute_two("Lien", "Type:", context=lien),
"Filed Against:": self.find_attribute_two("Filed", "Against:", context=lien),
"Amount:": self.find_attribute_one("Amount:", context=lien),
"Recorded Date:": self.find_attribute_two("Recorded", "Date:", context=lien),
"Recording Information:": self.find_attribute_two("Recording", "Information:", context=lien),
"Comment:": self.find_attribute_one("Comment:", context=lien),
}
none = True
for k, v in lien_dict.items():
if v is not None:
none = False
if not none:
lien_list.append(lien_dict)
return lien_list
def get_vesting_instrument(self):
"""
Finds information about 'Vesting Instrument Type'
:return: information about vesting instrument type
"""
line = self.get_lines("Vesting", "Instrument")
lien_list = []
for i, row in line.iterrows():
lien = self.words[(self.words.page_idx == row.page_idx) & (
self.words.block_idx == row.block_idx)]
lien_dict = {
"Vesting Instrument Type": self.find_attribute_two("Vesting", "Instrument", context=lien),
"Executed": self.find_attribute_one("Executed:", context=lien),
"Recorded": self.find_attribute_one("Recorded:", context=lien),
"Recording Information": self.find_attribute_two("Recording", "Information:", context=lien),
"Comment": self.find_attribute_one("Comment:", context=lien),
}
none = True
for k, v in lien_dict.items():
if v is not None:
none = False
if not none:
lien_list.append(lien_dict)
return lien_list
def get_instrument(self):
"""
Finds information about 'Instrument Type'.
:return: information about instrument type
"""
line = self.get_lines("Instrument", "Type:")
lien_list = []
for i, row in line.iterrows():
lien = self.words[(self.words.page_idx == row.page_idx) & (
self.words.block_idx == row.block_idx)]
lien_dict = {
"Instrument Type:": self.find_attribute_two("Instrument", "Type:", context=lien),
"From:": self.find_attribute_one("From:", context=lien),
"To:": self.find_attribute_one("To:", context=lien),
"Executed:": self.find_attribute_one("Executed:", context=lien),
"Recorded:": self.find_attribute_one("Recorded:", context=lien),
"Recording Information:": self.find_attribute_two("Mortgage", "Recording", context=lien),
}
none = True
for k, v in lien_dict.items():
if v is not None:
none = False
if not none:
lien_list.append(lien_dict)
return lien_list
def find_table_pages(self, word1, word2):
"""
Finds pages that can have 'Federal Tax Lien' tables.
:param word1: first word of two to match with any line of the document
:param word2: second word of two to match with any line of the document
:return: a pd.Dataframe with pages with high score of being the pages we are looking for
"""
try:
context = self.words
# word1, word2 = "Notice", "Lien"
line = context[context['value'].str.contains(word1) | context['value'].str.contains(word2)][
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]]
line['paired'] = line.value + " " + line.value.shift(-1)
line['space'] = line.x1.shift(-1) - line.x2
line['align'] = (abs(line.y1.shift(-1) - line.y1) +
(line.y2.shift(-1) - line.y2)) / 2
line["score"] = line["paired"].apply(
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2)))
line = line[line.score < 10]
line = line.sort_values(by=['score', 'space', "align"])
return line
except Exception as e:
return None
@staticmethod
def find_column_values(report, context, word1, word2, right=0.0, left=0.0, height=0.21):
"""
Finds values for a specific column for a specific table.
:param context: if there are any specific context to look at, to shorten the search space.
:param word1: first word of two to match with any line of the document
:param word2: second word of two to match with any line of the document
:param right: increase or decrease in right of the words for a column
:param left: increase or decrease in left of the words for a column
:param height: height of the column to consider
:return: a pd.Dataframe containing rows for the values of the column
"""
try:
# word1, word2 = "Kind", "Tax"
c = report.words[(report.words.page_idx == context.page_idx)]
column = c[c['value'].str.contains(word1) | c['value'].str.contains(word2)][
["page_idx", "block_idx", "line_idx", "x1", "y1", "x2", "y2", "value"]]
column['paired'] = column.value + " " + column.value.shift(-1)
column['space'] = abs(column.x1.shift(-1) - column.x2)
column['align'] = (
abs(column.y1.shift(-1) - column.y1) + (column.y2.shift(-1) - column.y2)) / 2
column['x12'] = column.x1.shift(-1)
column['y12'] = column.y1.shift(-1)
column['x22'] = column.x2.shift(-1)
column['y22'] = column.y2.shift(-1)
column["score"] = column["paired"].apply(
lambda x: 100 - fuzz.ratio(str(x), str(word1) + " " + str(word2)))
column = column[column.score < 10]
column = column.sort_values(by=['score', 'space', "align"])
column_data = c[(c.x1 >= column.x1.item() - left) & (c.x2 <=
column.x22.item() + right) & (c.y1 - 0.01 >= column.y2.item())]
column_data['hd'] = abs(
column_data.y1.shift(-1) - column.y2.item())
column_data = column_data.sort_values(by=['hd'])
column_data['bid'] = abs(
column_data.block_idx.shift(-1) - column_data.block_idx)
column_data['lid'] = abs(
column_data.line_idx.shift(-1) - column_data.line_idx)
column_data['h2d'] = abs(column_data.hd.shift(-1) - column_data.hd)
column_data = column_data.sort_values(
by=['hd', 'h2d', 'bid', "lid"])
column_data = column_data.reset_index(drop=True)
values = []
for i, k in column_data.iterrows():
if context.block_idx == k.block_idx:
continue
# elif k.bid > 1 or k.lid > block_d or column_data.iloc[i + line_d].hd.item() > height:
# break
if k.hd > height or column_data.iloc[i + 1].h2d.item() > height:
break
else:
values.append(k)
return values
except Exception as e:
return None
def get_lien_tables(self):
"""
Finds all the value in a structure way for a table.
:return: list of tables, tables are a list of rows
"""
notice_lien = self.find_table_pages("Notice", "Lien")
lien_tables = []
for i, table in notice_lien.iterrows():
# table = notice_lien.iloc[0][["page_idx", "block_idx", "line_idx", "paired"]]
c = self.words[(self.words.page_idx == table.page_idx)]
kind_of_tax = self.find_column_values(table, "Kind", "Tax")
first_col = pd.DataFrame(kind_of_tax)
rows = [["Kind of Tax (a)", "Tax Period Ending (b)", "Identifying Number (c)",
"Date of Assessment", "Last Day for Refining (e)", "Unpaid Balance of Assessment (f)"]]
for k, v in first_col.iterrows():
line_values = list(c[(c.y1 >= c[c.line_idx == v.line_idx].y1.min()) & (
c.y2 <= c[c.line_idx == v.line_idx].y2.max())].value.values)
line_values.append(line_values[0])
rows.append(line_values[1:])
if len(rows) > 1:
lien_tables.append(rows)
return lien_tables
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment