Skip to content

Instantly share code, notes, and snippets.

@sherwoac
Created July 27, 2022 23:20
Show Gist options
  • Save sherwoac/0a2e4f5e9352364122586271725001d6 to your computer and use it in GitHub Desktop.
Save sherwoac/0a2e4f5e9352364122586271725001d6 to your computer and use it in GitHub Desktop.
HSBC pdf statements to csv
name: pdf_extractor_env
channels:
- conda-forge
- defaults
dependencies:
- _libgcc_mutex=0.1=main
- _openmp_mutex=5.1=1_gnu
- blas=1.0=openblas
- bottleneck=1.3.5=py39h7deecbd_0
- brotli=1.0.9=he6710b0_2
- brotlipy=0.7.0=py39h27cfd23_1003
- bzip2=1.0.8=h7b6447c_0
- ca-certificates=2022.07.19=h06a4308_0
- cairo=1.16.0=h19f5f5c_2
- camelot-py=0.9.0=pyhd8ed1ab_0
- certifi=2022.6.15=py39h06a4308_0
- cffi=1.15.0=py39hd667e15_1
- charset-normalizer=2.0.4=pyhd3eb1b0_0
- click=8.0.4=py39h06a4308_0
- cryptography=37.0.1=py39h9ce1e76_0
- cycler=0.11.0=pyhd3eb1b0_0
- dbus=1.13.18=hb2f20db_0
- distro=1.5.0=pyhd3eb1b0_1
- eigen=3.3.7=hd09550d_1
- et_xmlfile=1.1.0=py39h06a4308_0
- expat=2.4.4=h295c915_0
- ffmpeg=4.2.2=h20bf706_0
- fontconfig=2.13.1=h6c09931_0
- fonttools=4.25.0=pyhd3eb1b0_0
- freetype=2.11.0=h70c0345_0
- ghostscript=9.54.0=h27087fc_2
- giflib=5.2.1=h7b6447c_0
- glib=2.69.1=h4ff587b_1
- gmp=6.2.1=h295c915_3
- gnutls=3.6.15=he1e5248_0
- graphite2=1.3.14=h295c915_1
- gst-plugins-base=1.14.0=h8213a91_2
- gstreamer=1.14.0=h28cd5cc_2
- harfbuzz=4.3.0=hd55b92a_0
- hdf5=1.10.6=h3ffc7dd_1
- icu=58.2=he6710b0_3
- idna=3.3=pyhd3eb1b0_0
- intel-openmp=2021.4.0=h06a4308_3561
- jpeg=9e=h7f8727e_0
- kiwisolver=1.4.2=py39h295c915_0
- lame=3.100=h7b6447c_0
- lcms2=2.12=h3be6417_0
- ld_impl_linux-64=2.38=h1181459_1
- libffi=3.3=he6710b0_2
- libgcc-ng=11.2.0=h1234567_1
- libgfortran-ng=11.2.0=h00389a5_1
- libgfortran5=11.2.0=h1234567_1
- libgomp=11.2.0=h1234567_1
- libidn2=2.3.2=h7f8727e_0
- libopenblas=0.3.20=h043d6bf_1
- libopus=1.3.1=h7b6447c_0
- libpng=1.6.37=hbc83047_0
- libprotobuf=3.20.1=h4ff587b_0
- libstdcxx-ng=11.2.0=h1234567_1
- libtasn1=4.16.0=h27cfd23_0
- libtiff=4.2.0=h2818925_1
- libunistring=0.9.10=h27cfd23_0
- libuuid=1.0.3=h7f8727e_2
- libvpx=1.7.0=h439df22_0
- libwebp=1.2.2=h55f646e_0
- libwebp-base=1.2.2=h7f8727e_0
- libxcb=1.15=h7f8727e_0
- libxml2=2.9.14=h74e7548_0
- lz4-c=1.9.3=h295c915_1
- matplotlib-base=3.4.3=py39hbbc1b5f_0
- mkl=2021.4.0=h06a4308_640
- mkl-service=2.4.0=py39h7f8727e_0
- munkres=1.1.4=py_0
- ncurses=6.3=h5eee18b_3
- nettle=3.7.3=hbbd107a_1
- numpy=1.16.6=py39h0708ffd_4
- numpy-base=1.16.6=py39ha8aedfd_4
- opencv=4.5.5=py39h1ca2c5e_3
- openh264=2.1.1=h4ff587b_0
- openjdk=11.0.13=h87a67e3_0
- openjpeg=2.4.0=h3ad879b_0
- openpyxl=3.0.10=py39h5eee18b_0
- openssl=1.1.1q=h7f8727e_0
- packaging=21.3=pyhd3eb1b0_0
- pandas=1.2.4=py39ha9443f7_0
- pcre=8.45=h295c915_0
- pdfminer.six=20220524=pyhd8ed1ab_1
- pillow=9.2.0=py39hace64e9_1
- pip=22.1.2=py39h06a4308_0
- pixman=0.40.0=h7f8727e_1
- pycparser=2.21=pyhd3eb1b0_0
- pycryptodome=3.15.0=py39h276157c_0
- pyopenssl=22.0.0=pyhd3eb1b0_0
- pyparsing=3.0.4=pyhd3eb1b0_0
- pypdf2=2.4.2=pyhd8ed1ab_0
- pysocks=1.7.1=py39h06a4308_0
- python=3.9.12=h12debd9_1
- python-dateutil=2.8.2=pyhd3eb1b0_0
- python_abi=3.9=2_cp39
- pytz=2022.1=py39h06a4308_0
- qt=5.9.7=h5867ecd_1
- readline=8.1.2=h7f8727e_1
- requests=2.28.1=py39h06a4308_0
- setuptools=61.2.0=py39h06a4308_0
- setuptools-scm=7.0.4=py39h06a4308_0
- setuptools_scm=7.0.4=hd3eb1b0_0
- six=1.16.0=pyhd3eb1b0_1
- sqlite=3.38.5=hc218d9a_0
- tabula-py=2.2.0=py39hf3d152e_3
- tk=8.6.12=h1ccaba5_0
- tomli=2.0.1=py39h06a4308_0
- tornado=6.1=py39h27cfd23_0
- tqdm=4.64.0=py39h06a4308_0
- typing-extensions=4.1.1=hd3eb1b0_0
- typing_extensions=4.1.1=pyh06a4308_0
- tzdata=2022a=hda174b7_0
- urllib3=1.26.9=py39h06a4308_0
- wheel=0.37.1=pyhd3eb1b0_0
- x264=1!157.20191217=h7b6447c_0
- xz=5.2.5=h7f8727e_1
- zlib=1.2.12=h7f8727e_2
- zstd=1.5.2=ha4553b6_0
- pip:
- opencv-python==4.6.0.66
#!/usr/bin/python3
import os
import sys
from collections import defaultdict
import csv
import camelot
import pandas as pd
import io
import numpy as np
import tqdm
import dateutil
_brought_forward = 'BALANCE BROUGHT FORWARD'
_carried_forward = 'BALANCE CARRIED FORWARD'
def concat_description(lines):
"""
concatenates the description for lines without amounts
:param no_extras_lines:
:return: only lines with amounts
"""
# find grouped lines
continuation_to_final_lines = defaultdict(list)
final_line = 0
# easier to do it in reverse as there's always a final line
for continuation_line in reversed(range(len(lines))):
line = lines[continuation_line]
if len(line):
if all([el == '' for el in line[-3:]]):
continuation_to_final_lines[final_line].append(continuation_line)
else:
final_line = continuation_line
# concat grouped lines
for final_line in reversed(sorted(continuation_to_final_lines.keys())):
continuation_lines = continuation_to_final_lines[final_line]
descriptions = (lines[continuation_line][2] for continuation_line in sorted(continuation_lines) + [final_line])
desc = ' '.join(descriptions)
replacement_line = lines[continuation_lines[0]][:2] + [desc] + lines[final_line][3:]
# delete the unwanted lines bottom up
del lines[final_line]
for continuation_line in reversed(sorted(continuation_lines)[1:]):
del lines[continuation_line]
lines[sorted(continuation_lines)[0]] = replacement_line
return lines
def write_out_lines(output_filename: str, lines: list):
with open(output_filename, 'w', encoding='UTF8', newline='\n') as f:
writer = csv.writer(f)
writer.writerows(lines)
f.close()
def contains_elements_by_row(df: pd.DataFrame, elements: list):
"""
whether each row contains any of the elements
:param df: given dataframe
:param elements: contains any of these terms
:return: rows by boolean
"""
any_row = [False] * len(df)
for index, row in df.iterrows():
any_row[index] = any([row.str.contains(check_el).any() for check_el in elements])
return any_row
column_titles = ['Date\nPay m e nt\nt y pe and de t ails', 'Pay m e nt\nt y pe and de t ails', 'Paid out', 'Paid in', 'Balance']
def pdf_to_text(input_filename):
tables = camelot.read_pdf(input_filename, flavor='stream', flag_size=True, pages='all')
table_page_to_table_index = {}
for table_index, table in enumerate(tables):
# print(f'{table.page=} {table_index=}, '
# f'len(df)={len(table.df)} '
# f'shape: {table.df.shape} '
# f'column sum: {table.df.isin(column_titles).any(axis=0).sum()} '
# f'row.str.contains: {sum(contains_elements_by_row(table.df, [_brought_forward, _carried_forward]))}')
# print(table.df)
if table.df.isin(column_titles).any(axis=0).sum() > 1 \
and sum(contains_elements_by_row(table.df, [_brought_forward, _carried_forward])) == 2:
if table.page in table_page_to_table_index:
print(f'check: {input_filename=} {table.page=}')
if (table.page in table_page_to_table_index and \
len(tables[table_page_to_table_index[table.page]].df) < len(table.df)) or table.page not in table_page_to_table_index:
# check it's longer, if it is, stick it in
table_page_to_table_index[table.page] = table_index
df = pd.DataFrame()
for table_index in table_page_to_table_index.values():
this_df = tables[table_index].df
# print(this_df)
this_df.reset_index(drop=True, inplace=True)
# must contain one of column_titles
for column, is_column in this_df.isin(column_titles).any(axis=0).items():
if not is_column:
this_df.drop(this_df.columns[[column]], axis=1, inplace=True)
this_df.columns = range(this_df.columns.size)
matches = contains_elements_by_row(this_df, [_brought_forward, _carried_forward])
keep_from = matches.index(True)
keep_to = matches.index(True, keep_from + 1)
assert keep_from and keep_to, f'{keep_from=} {keep_to=}'
this_df = this_df[keep_from+1:keep_to]
df = pd.concat([df, this_df])
# print(this_df)
# print(f'{table.page=} {table_index=}, len(df)={len(df)}')
df.reset_index(drop=True, inplace=True)
# name existing columns
df.columns = ['desc', 'out', 'in', 'balance']
df['date'] = None
df['type'] = None
last_type = ''
for row_index, row in df.iterrows():
# print(row_index, row)
split_desc = row['desc'].split('\n')
split_desc.sort(key=len)
if len(split_desc) > 2:
for split_index, el in enumerate(split_desc):
try:
last_date = dateutil.parser.parse(el).strftime('%Y-%m-%d')
del split_desc[split_index]
except dateutil.parser._parser.ParserError as p:
continue
if len(split_desc) > 1:
last_type = split_desc[0]
df.loc[row_index, 'type'] = last_type
df.loc[row_index, 'date'] = last_date
df.loc[row_index, 'desc'] = split_desc[-1]
# reorder columns
df = df[['date', 'type', 'desc', 'out', 'in', 'balance']]
# write to buffer
buffer = io.StringIO()
df.to_csv(buffer, index=False, line_terminator='\n')
buffer.seek(0)
# splitting by \n is ugly at this point, the csv parser won't delimit by lines, relies on file parser
output = buffer.getvalue().split('\n')
buffer.close()
reader = csv.reader(output, delimiter=',', quotechar='"')
lines = [row for row in reader]
return lines[0], lines[1:-1]
if __name__ == '__main__':
assert len(sys.argv) == 3
input_dir = sys.argv[1]
output_file = sys.argv[2]
print(f'processing: dir: {input_dir} to csv:{output_file}')
assert os.path.isdir(input_dir), f'{input_dir=} does not exist'
all_lines = None
for pdf_filename in (progress_bar := tqdm.tqdm(sorted(os.listdir(input_dir)))):
if 'pdf' in pdf_filename:
progress_bar.set_description(f'{pdf_filename=}')
columns, file_lines = pdf_to_text(os.path.join(input_dir, pdf_filename))
file_lines = concat_description(file_lines)
if not all_lines:
all_lines = [columns]
all_lines.extend(file_lines)
write_out_lines(output_file, all_lines)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment