Skip to content

Instantly share code, notes, and snippets.

@CharlesGodwin
Created May 10, 2022 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save CharlesGodwin/7251859c94ff661a74d120b0030821b4 to your computer and use it in GitHub Desktop.
Save CharlesGodwin/7251859c94ff661a74d120b0030821b4 to your computer and use it in GitHub Desktop.
A python program to sanitize CSV files for loading into MySQL
#!/usr/bin/env python
#
# MUST be at least python 3.7
#
import argparse
import csv
import os
import re
import sys
from collections import OrderedDict
ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
outcolumns = [] # list of header names for out columns, in order
outcolsize = [] # max width of column
outdefault = [] # list of default values for out rows, in column order
outmap = [] # map of in input fields to output fields. Mor than one input field can map to same output for concatenation
args = None
#
# mapping setup
# o must be CSV with header row
# o col1 is output column name
# o col2 is input column names, ';' delimited or blank if new column
# o col3 is default value if cell is empty or no value in col2
#
def tui():
global args
parser = argparse.ArgumentParser(description='CSV Cleanup Program', prog='cleancsv', epilog='Contact Charles Godwin magnum@godwin.ca for assistance. All untitled columns will be ignored and removed.')
parser.add_argument('filename', default='', help='File to cleanup. Must be .csv or .txt file')
parser.add_argument('--input', default='excel', choices=['excel', 'excel-tab', 'unix'], help='Select type of data in input file. (default: %(default)s)')
parser.add_argument('--encoding', default='iso-8859-1', help='Select type of data encoding in input file. (default: %(default)s)')
parser.add_argument('--delimiter', default=',', help='Column delimiter. (default: %(default)s)')
parser.add_argument('--output', default='excel', choices=[ 'excel', 'excel-tab', 'unix'], help='Select type of output file.(default: %(default)s)')
parser.add_argument('--split', default=200000, type=int, help='Number of rows in each output file. Use 0 for no splitting. (default: %(default)s)')
parser.add_argument('--map', type=argparse.FileType('r'), help='Column mapping CSV file. Ignore if possible.')
parser.add_argument("--noheaderrow", action="store_true", help="Do not generate column headers(default: %(default)s)")
args = parser.parse_args()
if args.split == 0:
args.split = sys.maxsize
filepath = os.path.abspath(args.filename)
if args.map != None:
setupMapping(args.map)
main(filepath)
def setupMapping(mapfile):
global outcolumns, outdefault, outmap
csvreader = csv.reader(mapfile)
for row in csvreader:
if csvreader.line_num == 1: # skip headers in CSV file
continue
outcolumn = row[0].strip()
incolumns = None
sourcecolumns = row[1].strip()
column3 = row[2] # Do NOT strip() this field!!
outcolumns.append(outcolumn)
if sourcecolumns:
if column3 == '':
column3 = ' ' # dummy space if needed
incolumns = sourcecolumns.split(';')
for incolumn in incolumns:
outmap.append([incolumn, outcolumn, column3])
column3 = '' # strip the default
outdefault.append(column3.strip())
mapfile.close()
#
# This strips all weird characters, mostly from word processors, out of the string
#
def cleanString(value):
global ILLEGAL_CHARACTERS_RE
if value.strip() == '':
return ''
if not value.isascii():
value = value.replace("’", "'", -1)
value = value.replace("“", '"', -1)
value = value.replace('�', "'")
value = value.replace('—', "'", -1)
value = value.replace(u'\xe2\x80\x93', "'", -1)
value = value.replace(u'\xe2\x80\x94', "-", -1)
value = value.replace('â€"', '"', -1)
value = value.replace('â€' + '\x93', '_', -1)
value = value.replace('â€' + '\x9d', '"', -1)
value = value.replace("â„¢", '\u2122', -1)
value = value.replace("é", 'é', -1)
value = value.replace("½", '|', -1)
value = value.replace("…", '...', -1)
value = value.replace('”', '"')
value = value.replace('’', '\'', -1)
value = value.replace("''", "\"", -1)
value = value.replace(" - ", "-", -1)
value = value.replace('\u000b\u0009', '/', -1)
value = value.replace('\x9d', ' ', -1)
value = value.replace(u'\u2013', '-', -1)
value = value.replace(u'\u2014', '-', -1)
value = value.replace(u'\u2018', '\'', -1)
value = value.replace(u'\u2019', '\'', -1)
value = value.replace(u'\u201f', '"', -1)
value = value.replace(u'\u2020', '+', -1)
value = value.replace(u'\u2026', '.', -1)
value = value.replace('\r', '', -1)
value = value.replace('\t', ' ', -1)
value = value.replace('\n', '/', -1)
value = re.sub(r'"+', '"', value)
value = re.sub(r'\'+', '\'', value)
value = ILLEGAL_CHARACTERS_RE.sub('', value)
value = re.sub(r"\s+", ' ', value).strip()
return value
def main(filename):
global outcolumns, outdefault, outmap, args, outcolsize
filepath = os.path.abspath(filename)
basename = os.path.basename(filename)
basename, file_extension = os.path.splitext(basename)
clean_name = cleanString(basename)
clean_name = re.sub("[^0-9a-zA-Z]+", "_",
clean_name.replace('#', "num").replace('__', '_'))
if clean_name[0:1] == '_':
clean_name = clean_name[1:]
if clean_name[-1:] == '_':
clean_name = clean_name[:-1]
filedir = os.path.dirname(filepath)
outfile_seq = 1
if not os.path.isfile(filepath):
print('{} is not a valid file name'.format(filepath))
sys.exit(1)
if file_extension != '.csv' and file_extension != '.txt':
print('{} does not have a valid file extension.'.format(filepath))
sys.exit(1)
# some CSV have BOM bytes https://en.wikipedia.org/wiki/Byte_order_mark#UTF-8
# 0xEF,0xBB,0xBF
# these need to be stripped for phpmyadmin to work right
#
# infile = open(filepath, newline='')
infile = open(filepath, newline='', encoding=args.encoding)
if args.input == 'excel-tab':
dialect = csv.excel_tab
elif args.input == 'unix':
dialect = csv.unix_dialect
else:
dialect = csv.excel
csvreader = csv.reader(infile, dialect=dialect, delimiter=args.delimiter)
print('Converting:{0}{1}'.format(basename, file_extension))
if args.output == 'excel-tab':
dialect = csv.excel_tab
elif args.output == 'unix':
dialect = csv.unix_dialect
else:
dialect = csv.excel
csvwriter = None
blanks = 0
increment = 0
splitcount = 0
interval = 10000
doheader = True
rowcount = 0
outfile = None
incolumns = []
rowcount = 0
for row in csvreader:
rowcount += 1
if int(rowcount / args.split) != splitcount and csvwriter != None:
splitcount = int(rowcount / args.split)
outfile.close()
csvwriter = None
if csvwriter == None:
outfilename = f"{clean_name}_{outfile_seq:02d}.csv"
outfile_seq += 1
csvfilename = os.path.join(filedir, outfilename)
outfile = open(csvfilename, 'w', newline='', encoding='utf-8')
csvwriter = csv.writer(
outfile, dialect=dialect, quoting=csv.QUOTE_ALL, quotechar='"')
print(f"Writing to {outfilename}")
doheader = True
if csvreader.line_num == 1:
blank = 0
for title in row:
title = cleanString(title)
title = re.sub("[^0-9a-zA-Z]+", "_",
title.replace('#', "num").replace('__', '_'))
if title[0:1] == '_':
title = title[1:]
if title[-1:] == '_':
title = title[:-1]
if title:
if len(outmap) == 0:
outcolumns.append(title)
else:
blank += 1
incolumns.append(title)
print()
if blank > 0:
print('There were {0} of {1} unidentified input columns'.format(
blank, len(incolumns)), end=' ')
else:
print('There are {0} input columns'.format(
len(incolumns)), end=' ')
print('and {0} output columns'.format(len(outcolumns)))
for x in range(0, len(outcolumns)):
outcolsize.append(0)
if args.noheaderrow == False:
csvwriter.writerow(outcolumns)
doheader = False
continue
if doheader and args.noheaderrow == False:
csvwriter.writerow(outcolumns)
doheader = False
if (int(rowcount / interval) != increment):
increment = int(rowcount / interval)
outfile.flush()
indata = []
for cell in row:
indata.append(cleanString(cell))
if ''.join(indata).strip() == '':
blanks += 1
continue # skip blank line
else:
if len(row) != len(incolumns):
print('\nRow @ line {0} has {1} columns, expected {2}: {3}'.format(
csvreader.line_num, len(row), len(incolumns), row))
inputfields = OrderedDict(zip(incolumns, indata))
outdata = []
if len(outmap) == 0: # default behaviour
for key in outcolumns:
outdata.append(inputfields[key])
else:
# process when it's mapped
outputfields = OrderedDict(zip(outcolumns, outdefault))
for map in outmap:
target = map[1]
data = inputfields[map[0]]
if outputfields[target] and data:
# use predefined separator
outputfields[target] += map[2]
outputfields[target] += data
outdata = list(outputfields.values())
for x in range(0, len(outdata)):
if len(outdata[x]) > outcolsize[x]:
outcolsize[x] = len(outdata[x])
csvwriter.writerow(outdata)
print()
print("Processed {0} rows".format("{:,}".format(rowcount)))
if blanks > 0:
print('{0} rows were empty and were ignored.'.format(blanks))
print()
width = len('Column')
for x in range(0, len(outcolumns)):
if width <= len(outcolumns[x]):
width = len(outcolumns[x])
formatstr = '{:<' + str(width) + '} {:>' + str(len('Length')) + '}'
print(formatstr.format("Column", 'Length'))
for x in range(0, len(outcolumns)):
print(formatstr.format(outcolumns[x], outcolsize[x]))
print(",".join(outcolumns))
if __name__ == '__main__':
tui()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment