Created
May 10, 2022 14:13
-
-
Save CharlesGodwin/7251859c94ff661a74d120b0030821b4 to your computer and use it in GitHub Desktop.
A python program to sanitize CSV files for loading into MySQL
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# | |
# MUST be at least python 3.7 | |
# | |
import argparse | |
import csv | |
import os | |
import re | |
import sys | |
from collections import OrderedDict | |
ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]') | |
outcolumns = [] # list of header names for out columns, in order | |
outcolsize = [] # max width of column | |
outdefault = [] # list of default values for out rows, in column order | |
outmap = [] # map of in input fields to output fields. Mor than one input field can map to same output for concatenation | |
args = None | |
# | |
# mapping setup | |
# o must be CSV with header row | |
# o col1 is output column name | |
# o col2 is input column names, ';' delimited or blank if new column | |
# o col3 is default value if cell is empty or no value in col2 | |
# | |
def tui(): | |
global args | |
parser = argparse.ArgumentParser(description='CSV Cleanup Program', prog='cleancsv', epilog='Contact Charles Godwin magnum@godwin.ca for assistance. All untitled columns will be ignored and removed.') | |
parser.add_argument('filename', default='', help='File to cleanup. Must be .csv or .txt file') | |
parser.add_argument('--input', default='excel', choices=['excel', 'excel-tab', 'unix'], help='Select type of data in input file. (default: %(default)s)') | |
parser.add_argument('--encoding', default='iso-8859-1', help='Select type of data encoding in input file. (default: %(default)s)') | |
parser.add_argument('--delimiter', default=',', help='Column delimiter. (default: %(default)s)') | |
parser.add_argument('--output', default='excel', choices=[ 'excel', 'excel-tab', 'unix'], help='Select type of output file.(default: %(default)s)') | |
parser.add_argument('--split', default=200000, type=int, help='Number of rows in each output file. Use 0 for no splitting. (default: %(default)s)') | |
parser.add_argument('--map', type=argparse.FileType('r'), help='Column mapping CSV file. Ignore if possible.') | |
parser.add_argument("--noheaderrow", action="store_true", help="Do not generate column headers(default: %(default)s)") | |
args = parser.parse_args() | |
if args.split == 0: | |
args.split = sys.maxsize | |
filepath = os.path.abspath(args.filename) | |
if args.map != None: | |
setupMapping(args.map) | |
main(filepath) | |
def setupMapping(mapfile): | |
global outcolumns, outdefault, outmap | |
csvreader = csv.reader(mapfile) | |
for row in csvreader: | |
if csvreader.line_num == 1: # skip headers in CSV file | |
continue | |
outcolumn = row[0].strip() | |
incolumns = None | |
sourcecolumns = row[1].strip() | |
column3 = row[2] # Do NOT strip() this field!! | |
outcolumns.append(outcolumn) | |
if sourcecolumns: | |
if column3 == '': | |
column3 = ' ' # dummy space if needed | |
incolumns = sourcecolumns.split(';') | |
for incolumn in incolumns: | |
outmap.append([incolumn, outcolumn, column3]) | |
column3 = '' # strip the default | |
outdefault.append(column3.strip()) | |
mapfile.close() | |
# | |
# This strips all weird characters, mostly from word processors, out of the string | |
# | |
def cleanString(value): | |
global ILLEGAL_CHARACTERS_RE | |
if value.strip() == '': | |
return '' | |
if not value.isascii(): | |
value = value.replace("’", "'", -1) | |
value = value.replace("“", '"', -1) | |
value = value.replace('�', "'") | |
value = value.replace('—', "'", -1) | |
value = value.replace(u'\xe2\x80\x93', "'", -1) | |
value = value.replace(u'\xe2\x80\x94', "-", -1) | |
value = value.replace('â€"', '"', -1) | |
value = value.replace('â€' + '\x93', '_', -1) | |
value = value.replace('â€' + '\x9d', '"', -1) | |
value = value.replace("â„¢", '\u2122', -1) | |
value = value.replace("é", 'é', -1) | |
value = value.replace("½", '|', -1) | |
value = value.replace("…", '...', -1) | |
value = value.replace('”', '"') | |
value = value.replace('’', '\'', -1) | |
value = value.replace("''", "\"", -1) | |
value = value.replace(" - ", "-", -1) | |
value = value.replace('\u000b\u0009', '/', -1) | |
value = value.replace('\x9d', ' ', -1) | |
value = value.replace(u'\u2013', '-', -1) | |
value = value.replace(u'\u2014', '-', -1) | |
value = value.replace(u'\u2018', '\'', -1) | |
value = value.replace(u'\u2019', '\'', -1) | |
value = value.replace(u'\u201f', '"', -1) | |
value = value.replace(u'\u2020', '+', -1) | |
value = value.replace(u'\u2026', '.', -1) | |
value = value.replace('\r', '', -1) | |
value = value.replace('\t', ' ', -1) | |
value = value.replace('\n', '/', -1) | |
value = re.sub(r'"+', '"', value) | |
value = re.sub(r'\'+', '\'', value) | |
value = ILLEGAL_CHARACTERS_RE.sub('', value) | |
value = re.sub(r"\s+", ' ', value).strip() | |
return value | |
def main(filename): | |
global outcolumns, outdefault, outmap, args, outcolsize | |
filepath = os.path.abspath(filename) | |
basename = os.path.basename(filename) | |
basename, file_extension = os.path.splitext(basename) | |
clean_name = cleanString(basename) | |
clean_name = re.sub("[^0-9a-zA-Z]+", "_", | |
clean_name.replace('#', "num").replace('__', '_')) | |
if clean_name[0:1] == '_': | |
clean_name = clean_name[1:] | |
if clean_name[-1:] == '_': | |
clean_name = clean_name[:-1] | |
filedir = os.path.dirname(filepath) | |
outfile_seq = 1 | |
if not os.path.isfile(filepath): | |
print('{} is not a valid file name'.format(filepath)) | |
sys.exit(1) | |
if file_extension != '.csv' and file_extension != '.txt': | |
print('{} does not have a valid file extension.'.format(filepath)) | |
sys.exit(1) | |
# some CSV have BOM bytes https://en.wikipedia.org/wiki/Byte_order_mark#UTF-8 | |
# 0xEF,0xBB,0xBF | |
# these need to be stripped for phpmyadmin to work right | |
# | |
# infile = open(filepath, newline='') | |
infile = open(filepath, newline='', encoding=args.encoding) | |
if args.input == 'excel-tab': | |
dialect = csv.excel_tab | |
elif args.input == 'unix': | |
dialect = csv.unix_dialect | |
else: | |
dialect = csv.excel | |
csvreader = csv.reader(infile, dialect=dialect, delimiter=args.delimiter) | |
print('Converting:{0}{1}'.format(basename, file_extension)) | |
if args.output == 'excel-tab': | |
dialect = csv.excel_tab | |
elif args.output == 'unix': | |
dialect = csv.unix_dialect | |
else: | |
dialect = csv.excel | |
csvwriter = None | |
blanks = 0 | |
increment = 0 | |
splitcount = 0 | |
interval = 10000 | |
doheader = True | |
rowcount = 0 | |
outfile = None | |
incolumns = [] | |
rowcount = 0 | |
for row in csvreader: | |
rowcount += 1 | |
if int(rowcount / args.split) != splitcount and csvwriter != None: | |
splitcount = int(rowcount / args.split) | |
outfile.close() | |
csvwriter = None | |
if csvwriter == None: | |
outfilename = f"{clean_name}_{outfile_seq:02d}.csv" | |
outfile_seq += 1 | |
csvfilename = os.path.join(filedir, outfilename) | |
outfile = open(csvfilename, 'w', newline='', encoding='utf-8') | |
csvwriter = csv.writer( | |
outfile, dialect=dialect, quoting=csv.QUOTE_ALL, quotechar='"') | |
print(f"Writing to {outfilename}") | |
doheader = True | |
if csvreader.line_num == 1: | |
blank = 0 | |
for title in row: | |
title = cleanString(title) | |
title = re.sub("[^0-9a-zA-Z]+", "_", | |
title.replace('#', "num").replace('__', '_')) | |
if title[0:1] == '_': | |
title = title[1:] | |
if title[-1:] == '_': | |
title = title[:-1] | |
if title: | |
if len(outmap) == 0: | |
outcolumns.append(title) | |
else: | |
blank += 1 | |
incolumns.append(title) | |
print() | |
if blank > 0: | |
print('There were {0} of {1} unidentified input columns'.format( | |
blank, len(incolumns)), end=' ') | |
else: | |
print('There are {0} input columns'.format( | |
len(incolumns)), end=' ') | |
print('and {0} output columns'.format(len(outcolumns))) | |
for x in range(0, len(outcolumns)): | |
outcolsize.append(0) | |
if args.noheaderrow == False: | |
csvwriter.writerow(outcolumns) | |
doheader = False | |
continue | |
if doheader and args.noheaderrow == False: | |
csvwriter.writerow(outcolumns) | |
doheader = False | |
if (int(rowcount / interval) != increment): | |
increment = int(rowcount / interval) | |
outfile.flush() | |
indata = [] | |
for cell in row: | |
indata.append(cleanString(cell)) | |
if ''.join(indata).strip() == '': | |
blanks += 1 | |
continue # skip blank line | |
else: | |
if len(row) != len(incolumns): | |
print('\nRow @ line {0} has {1} columns, expected {2}: {3}'.format( | |
csvreader.line_num, len(row), len(incolumns), row)) | |
inputfields = OrderedDict(zip(incolumns, indata)) | |
outdata = [] | |
if len(outmap) == 0: # default behaviour | |
for key in outcolumns: | |
outdata.append(inputfields[key]) | |
else: | |
# process when it's mapped | |
outputfields = OrderedDict(zip(outcolumns, outdefault)) | |
for map in outmap: | |
target = map[1] | |
data = inputfields[map[0]] | |
if outputfields[target] and data: | |
# use predefined separator | |
outputfields[target] += map[2] | |
outputfields[target] += data | |
outdata = list(outputfields.values()) | |
for x in range(0, len(outdata)): | |
if len(outdata[x]) > outcolsize[x]: | |
outcolsize[x] = len(outdata[x]) | |
csvwriter.writerow(outdata) | |
print() | |
print("Processed {0} rows".format("{:,}".format(rowcount))) | |
if blanks > 0: | |
print('{0} rows were empty and were ignored.'.format(blanks)) | |
print() | |
width = len('Column') | |
for x in range(0, len(outcolumns)): | |
if width <= len(outcolumns[x]): | |
width = len(outcolumns[x]) | |
formatstr = '{:<' + str(width) + '} {:>' + str(len('Length')) + '}' | |
print(formatstr.format("Column", 'Length')) | |
for x in range(0, len(outcolumns)): | |
print(formatstr.format(outcolumns[x], outcolsize[x])) | |
print(",".join(outcolumns)) | |
if __name__ == '__main__': | |
tui() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment