Created
January 22, 2021 01:11
-
-
Save hoangph-1744/83e53e53e6ecceb90ed6d116e2393bff to your computer and use it in GitHub Desktop.
Graph Modeling - Invoice data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import pandas as pd | |
import cv2 | |
import os | |
import matplotlib.pyplot as plt | |
import math | |
import itertools | |
import networkx as nx | |
class Grapher: | |
""" | |
Description: | |
This class is used to generate: | |
1) the graph (in dictionary form) { source_node: [destination_node1, destination_node2]} | |
2) the dataframe with relative_distances | |
Inputs: The class consists of a pandas dataframe consisting of cordinates for bounding boxe and the image of the invoice/receipt. | |
""" | |
def __init__(self, filename, data_fd): | |
self.filename = filename | |
self.data_fd = data_fd | |
file_path = os.path.join(self.data_fd, "raw/box", filename + '.csv') | |
interim_path = os.path.join(self.data_fd, "interim", filename + '.csv') | |
image_path = os.path.join(self.data_fd, "raw/img", filename + '.jpg') | |
self.df = pd.read_csv(file_path, header=None, sep='\n') | |
self.image = cv2.imread(image_path) | |
self.df_withlabels = pd.read_csv(interim_path) | |
def graph_formation(self, export_graph = False): | |
""" | |
Description: | |
=========== | |
Line formation: | |
1) Sort words based on Top coordinate: | |
2) Form lines as group of words which obeys the following: | |
Two words (W_a and W_b) are in same line if: | |
Top(W_a) <= Bottom(W_b) and Bottom(W_a) >= Top(W_b) | |
3) Sort words in each line based on Left coordinate | |
This ensures that words are read from top left corner of the image first, | |
going line by line from left to right and at last the final bottom right word of the page is read. | |
Args: | |
df with words and cordinates (xmin,xmax,ymin,ymax) | |
image read into cv2 | |
returns: | |
df with words arranged in orientation top to bottom and left to right, the line number for each word, index of the node connected to | |
on all directions top, bottom, right and left (if they exist and satisfy the parameters provided) | |
_____________________y axis______________________ | |
| | |
| top | |
x axis ___________________ | |
| left | bounding box | right | |
| |___________________| | |
| bottom | |
| | |
| | |
iterate through the rows twice to compare them. | |
remember that the axes are inverted. | |
""" | |
df, image = self.df, self.image | |
""" | |
preprocessing the raw csv files to favorable df | |
""" | |
df = df[0].str.split(',', expand=True) | |
temp = df.copy() | |
temp[temp.columns] = temp.apply(lambda x: x.str.strip()) | |
temp.fillna('', inplace=True) | |
temp[8]= temp[8].str.cat(temp.iloc[:,9:], sep =", ") | |
temp[temp.columns] = temp.apply(lambda x: x.str.rstrip(", ,")) | |
temp = temp.loc[:, :8] | |
temp.drop([2,3,6,7], axis=1, inplace=True) | |
temp.columns = ['xmin','ymin','xmax','ymax','Object'] | |
temp[['xmin','ymin','xmax','ymax']] = temp[['xmin','ymin','xmax','ymax']].apply(pd.to_numeric) | |
df = temp | |
assert type(df) == pd.DataFrame,f'object_map should be of type \ | |
{pd.DataFrame}. Received {type(df)}' | |
assert type(image) == np.ndarray,f'image should be of type {np.ndarray} \ | |
. Received {type(image)}' | |
assert 'xmin' in df.columns, '"xmin" not in object map' | |
assert 'xmax' in df.columns, '"xmax" not in object map' | |
assert 'ymin' in df.columns, '"ymin" not in object map' | |
assert 'ymax' in df.columns, '"ymax" not in object map' | |
assert 'Object' in df.columns, '"Object" column not in object map' | |
#remove empty spaces both in front and behind | |
for col in df.columns: | |
try: | |
df[col] = df[col].str.strip() | |
except AttributeError: | |
pass | |
#further cleaning | |
df.dropna(inplace=True) | |
#sort from top to bottom | |
df.sort_values(by=['ymin'], inplace=True) | |
df.reset_index(drop=True, inplace=True) | |
#subtracting ymax by 1 to eliminate ambiguity of boxes being in both left and right | |
df["ymax"] = df["ymax"].apply(lambda x: x - 1) | |
master = [] | |
for idx, row in df.iterrows(): | |
#flatten the nested list | |
flat_master = list(itertools.chain(*master)) | |
#check to see if idx is in flat_master | |
if idx not in flat_master: | |
top_a = row['ymin'] | |
bottom_a = row['ymax'] | |
#every line will atleast have the word in it | |
line = [idx] | |
for idx_2, row_2 in df.iterrows(): | |
#check to see if idx_2 is in flat_master removes ambiguity | |
#picks higher cordinate one. | |
if idx_2 not in flat_master: | |
#if not the same words | |
if not idx == idx_2: | |
top_b = row_2['ymin'] | |
bottom_b = row_2['ymax'] | |
if (top_a <= bottom_b) and (bottom_a >= top_b): | |
line.append(idx_2) | |
master.append(line) | |
df2 = pd.DataFrame({'words_indices': master, 'line_number':[x for x in range(1,len(master)+1)]}) | |
#explode the list columns eg : [1,2,3] | |
df2 = df2.set_index('line_number').words_indices.apply(pd.Series).stack()\ | |
.reset_index(level=0).rename(columns={0:'words_indices'}) | |
df2['words_indices'] = df2['words_indices'].astype('int') | |
#put the line numbers back to the list | |
final = df.merge(df2, left_on=df.index, right_on='words_indices') | |
final.drop('words_indices', axis=1, inplace=True) | |
""" | |
3) Sort words in each line based on Left coordinate | |
""" | |
final2 =final.sort_values(by=['line_number','xmin'],ascending=True)\ | |
.groupby('line_number')\ | |
.head(len(final))\ | |
.reset_index(drop=True) | |
df = final2 | |
""" | |
Pseudocode: | |
1) Read words from each line starting from topmost line going towards bottommost line | |
2) For each word, perform the following: | |
- Check words which are in vertical projection with it. | |
- Calculate RD_l and RD_r for each of them | |
- Select nearest neighbour words in horizontal direction which have least magnitude of RD_l and RD_r, | |
provided that those words do not have an edge in that direciton. | |
- In case, two words have same RD_l or RD_r, the word having higher top coordinate is chosen. | |
- Repeat steps from 2.1 to 2.3 similarly for retrieving nearest neighbour words in vertical direction by | |
taking horizontal projection, calculating RD_t and RD_b and choosing words having higher left co-ordinate | |
incase of ambiguity | |
- Draw edges between word and its 4 nearest neighbours if they are available. | |
Args: | |
df after lines properly aligned | |
returns: | |
graph in the form of a dictionary, networkX graph, dataframe with | |
""" | |
#horizontal edges formation | |
#print(df) | |
df.reset_index(inplace=True) | |
grouped = df.groupby('line_number') | |
#for undirected graph construction | |
horizontal_connections = {} | |
#left | |
left_connections = {} | |
#right | |
right_connections = {} | |
for _,group in grouped: | |
a = group['index'].tolist() | |
b = group['index'].tolist() | |
horizontal_connection = {a[i]:a[i+1] for i in range(len(a)-1) } | |
#storing directional connections | |
right_dict_temp = {a[i]:{'right':a[i+1]} for i in range(len(a)-1) } | |
left_dict_temp = {b[i+1]:{'left':b[i]} for i in range(len(b)-1) } | |
#add the indices in the dataframes | |
for i in range(len(a)-1): | |
df.loc[df['index'] == a[i], 'right'] = int(a[i+1]) | |
df.loc[df['index'] == a[i+1], 'left'] = int(a[i]) | |
left_connections.update(right_dict_temp) | |
right_connections.update(left_dict_temp) | |
horizontal_connections.update(horizontal_connection) | |
dic1,dic2 = left_connections, right_connections | |
#verticle connections formation | |
bottom_connections = {} | |
top_connections = {} | |
for idx, row in df.iterrows(): | |
if idx not in bottom_connections.keys(): | |
right_a = row['xmax'] | |
left_a = row['xmin'] | |
for idx_2, row_2 in df.iterrows(): | |
#check for higher idx values | |
if idx_2 not in bottom_connections.values() and idx < idx_2: | |
right_b = row_2['xmax'] | |
left_b = row_2['xmin'] | |
if (left_b <= right_a) and (right_b >= left_a): | |
bottom_connections[idx] = idx_2 | |
top_connections[idx_2] = idx | |
#add it to the dataframe | |
df.loc[df['index'] == idx , 'bottom'] = idx_2 | |
df.loc[df['index'] == idx_2, 'top'] = idx | |
#print(bottom_connections) | |
#once the condition is met, break the loop to reduce redundant time complexity | |
break | |
#combining both | |
result = {} | |
dic1 = horizontal_connections | |
dic2 = bottom_connections | |
for key in (dic1.keys() | dic2.keys()): | |
if key in dic1: result.setdefault(key, []).append(dic1[key]) | |
if key in dic2: result.setdefault(key, []).append(dic2[key]) | |
#print(result) | |
G = nx.from_dict_of_lists(result) | |
if export_graph: | |
if not os.path.exists('../../figures/graphs'): | |
os.makedirs('../../figures/graphs') | |
plot_path ='../../figures/graphs/' + self.filename + 'plain_graph' '.jpg' | |
print(plot_path) | |
layout = nx.kamada_kawai_layout(G) | |
layout = nx.spring_layout(G) | |
nx.draw(G, layout, with_labels=True) | |
plt.savefig(plot_path, format="PNG", dpi=600) | |
#plt.show() | |
# connect with the interim file that has labels in it | |
df['labels'] = self.df_withlabels['9'] | |
self.df = df | |
return G,result, df | |
#features calculation | |
def get_text_features(self, df): | |
""" | |
gets text features | |
Args: df | |
Returns: n_lower, n_upper, n_spaces, n_alpha, n_numeric,n_special | |
""" | |
data = df['Object'].tolist() | |
''' | |
Args: | |
df | |
Returns: | |
character and word features | |
''' | |
special_chars = ['&', '@', '#', '(',')','-','+', | |
'=', '*', '%', '.', ',', '\\','/', | |
'|', ':'] | |
# character wise | |
n_lower, n_upper, n_spaces, n_alpha, n_numeric,n_special = [],[],[],[],[],[] | |
for words in data: | |
lower, upper,alpha,spaces,numeric,special = 0,0,0,0,0,0 | |
for char in words: | |
if char.islower(): | |
lower += 1 | |
# for upper letters | |
if char.isupper(): | |
upper += 1 | |
# for white spaces | |
if char.isspace(): | |
spaces += 1 | |
# for alphabetic chars | |
if char.isalpha(): | |
alpha += 1 | |
# for numeric chars | |
if char.isnumeric(): | |
numeric += 1 | |
if char in special_chars: | |
special += 1 | |
n_lower.append(lower) | |
n_upper.append(upper) | |
n_spaces.append(spaces) | |
n_alpha.append(alpha) | |
n_numeric.append(numeric) | |
n_special.append(special) | |
#features.append([n_lower, n_upper, n_spaces, n_alpha, n_numeric, n_digits]) | |
df['n_upper'],df['n_alpha'],df['n_spaces'],\ | |
df['n_numeric'],df['n_special'] = n_upper, n_alpha, n_spaces, n_numeric,n_special | |
def relative_distance(self, export_document_graph = False): | |
""" | |
1) Calculates relative distances for each node in left, right, top and bottom directions if they exist. | |
rd_l, rd_r = relative distances left , relative distances right. The distances are divided by image width | |
rd_t, rd_b = relative distances top , relative distances bottom. The distances are divided by image length | |
2) Exports the complete document graph for visualization | |
Args: | |
result dataframe from graph_formation() | |
returns: | |
dataframe with features and exports document graph if prompted | |
""" | |
df, img = self.df, self.image | |
image_height, image_width = self.image.shape[0], self.image.shape[1] | |
plot_df = df.copy() | |
for index in df['index'].to_list(): | |
right_index = df.loc[df['index'] == index, 'right'].values[0] | |
left_index = df.loc[df['index'] == index, 'left'].values[0] | |
bottom_index = df.loc[df['index'] == index, 'bottom'].values[0] | |
top_index = df.loc[df['index'] == index, 'top'].values[0] | |
#check if it is nan value | |
if np.isnan(right_index) == False: | |
right_word_left = df.loc[df['index'] == right_index, 'xmin'].values[0] | |
source_word_right = df.loc[df['index'] == index, 'xmax'].values[0] | |
df.loc[df['index'] == index, 'rd_r'] = (right_word_left - source_word_right)/image_width | |
""" | |
for plotting purposes | |
getting the mid point of the values to draw the lines for the graph | |
mid points of source and destination for the bounding boxes | |
""" | |
right_word_x_max = df.loc[df['index'] == right_index, 'xmax'].values[0] | |
right_word_y_max = df.loc[df['index'] == right_index, 'ymax'].values[0] | |
right_word_y_min = df.loc[df['index'] == right_index, 'ymin'].values[0] | |
df.loc[df['index'] == index, 'destination_x_hori'] = (right_word_x_max + right_word_left)/2 | |
df.loc[df['index'] == index, 'destination_y_hori'] = (right_word_y_max + right_word_y_min)/2 | |
if np.isnan(left_index) == False: | |
left_word_right = df.loc[df['index'] == left_index, 'xmax'].values[0] | |
source_word_left = df.loc[df['index'] == index, 'xmin'].values[0] | |
df.loc[df['index'] == index, 'rd_l'] = (left_word_right - source_word_left)/image_width | |
if np.isnan(bottom_index) == False: | |
bottom_word_top = df.loc[df['index'] == bottom_index, 'ymin'].values[0] | |
source_word_bottom = df.loc[df['index'] == index, 'ymax'].values[0] | |
df.loc[df['index'] == index, 'rd_b'] = (bottom_word_top - source_word_bottom)/image_height | |
"""for plotting purposes""" | |
bottom_word_top_max = df.loc[df['index'] == bottom_index, 'ymax'].values[0] | |
bottom_word_x_max = df.loc[df['index'] == bottom_index, 'xmax'].values[0] | |
bottom_word_x_min = df.loc[df['index'] == bottom_index, 'xmin'].values[0] | |
df.loc[df['index'] == index, 'destination_y_vert'] = (bottom_word_top_max + bottom_word_top)/2 | |
df.loc[df['index'] == index, 'destination_x_vert'] = (bottom_word_x_max + bottom_word_x_min)/2 | |
if np.isnan(top_index) == False: | |
top_word_bottom = df.loc[df['index'] == top_index, 'ymax'].values[0] | |
source_word_top = df.loc[df['index'] == index, 'ymin'].values[0] | |
df.loc[df['index'] == index, 'rd_t'] = (top_word_bottom - source_word_top)/image_height | |
#replace all tne NaN values with '0' meaning there is nothing in that direction | |
df[['rd_r','rd_b','rd_l','rd_t']] = df[['rd_r','rd_b','rd_l','rd_t']].fillna(0) | |
if export_document_graph: | |
for idx, row in df.iterrows(): | |
#bounding box | |
cv2.rectangle(img, (row['xmin'], row['ymin']), (row['xmax'], row['ymax']), (0, 0, 255), 2) | |
if np.isnan(row['destination_x_vert']) == False: | |
source_x = (row['xmax'] + row['xmin'])/2 | |
source_y = (row['ymax'] + row['ymin'])/2 | |
cv2.line(img, | |
(int(source_x), int(source_y)), | |
(int(row['destination_x_vert']), int(row['destination_y_vert'])), | |
(0,255,0), 2) | |
text = "{:.3f}".format(row['rd_b']) | |
text_coordinates = ( int((row['destination_x_vert'] + source_x)/2) , int((row['destination_y_vert'] +source_y)/2)) | |
cv2.putText(img, text, text_coordinates, cv2.FONT_HERSHEY_DUPLEX, 0.4, (255,0,0), 1) | |
#text_coordinates = ((row['destination_x_vert'] + source_x)/2 , (row['destination_y_vert'] +source_y)/2) | |
if np.isnan(row['destination_x_hori']) == False: | |
source_x = (row['xmax'] + row['xmin'])/2 | |
source_y = (row['ymax'] + row['ymin'])/2 | |
cv2.line(img, | |
(int(source_x), int(source_y)), | |
(int(row['destination_x_hori']), int(row['destination_y_hori'])), \ | |
(0,255,0), 2) | |
text = "{:.3f}".format(row['rd_r']) | |
text_coordinates = (int((row['destination_x_hori'] + source_x)/2) , int((row['destination_y_hori'] +source_y)/2)) | |
cv2.putText(img, text, text_coordinates, cv2.FONT_HERSHEY_DUPLEX, 0.4, (255,0,0), 1) | |
# cv2.imshow("image", img) | |
# cv2.waitKey(0) | |
# cv2.destroyAllWindows() | |
if not os.path.exists('../../figures/graphs'): | |
os.makedirs('../../figures/graphs') | |
plot_path ='../../figures/graphs/' + self.filename + 'docu_graph' '.jpg' | |
cv2.imwrite(plot_path, img) | |
#drop the unnecessary columns | |
df.drop(['destination_x_hori', 'destination_y_hori','destination_y_vert','destination_x_vert'], axis=1, inplace=True) | |
self.get_text_features(df) | |
return df | |
if __name__ == "__main__": | |
file = '339' | |
connect = Grapher(file) | |
G,result, df = connect.graph_formation(export_graph=True) | |
df = connect.relative_distance(export_document_graph = True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Good