Skip to content

Instantly share code, notes, and snippets.

@patsweet
Created October 8, 2013 14:38
Show Gist options
  • Save patsweet/6885763 to your computer and use it in GitHub Desktop.
Save patsweet/6885763 to your computer and use it in GitHub Desktop.
Takes a fixed-width file and transforms it into pipe-delimited for easy import into various databases.
# Parse County Tax Records into pipe-delimited files.
# The raw data are all in fixed-width format. The record layout,
# includes the field names and the lengths of each record. Parsed
# in order from the 0-character, it's easy to grab the correct
# portion of the line for each record. E.g.:
# 0-1 = period
# 1-2 = bill_type
# 2-17 = acct_num.
#
# The record layouts are found in the record_layouts directory.
import os
from collections import OrderedDict
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
REC_LAYOUT_FILE = os.path.join(BASE_DIR, 'record_layouts', 'main_layout.txt')
FILES = [ # Tuples follow (original, cleaned) structure.
# 2013
(os.path.join(BASE_DIR, '2013A bill files', '2013A bill files', '2013AREG.TXT'),
os.path.join(BASE_DIR, 'cleaned_data', '2013AREG_cleaned.txt')),
(os.path.join(BASE_DIR, '2013A bill files', '2013A bill files', '2013ADNPIMAGE.TXT'),
os.path.join(BASE_DIR, 'cleaned_data', '2013ADNPIMAGE_cleaned.txt')),
# 2012
(os.path.join(BASE_DIR, '2012A bill files', '2012AREG.TXT'),
os.path.join(BASE_DIR, 'cleaned_data', '2012AREG_cleaned.txt')),
(os.path.join(BASE_DIR, '2012A bill files', '2012Amort.TXT'),
os.path.join(BASE_DIR, 'cleaned_data', '2012Amort_cleaned.txt')),
# 2011
(os.path.join(BASE_DIR, '2011A bill files', '2011A bill files', '2011AREG.TXT'),
os.path.join(BASE_DIR, 'cleaned_data', '2011AREG_cleaned.txt')),
(os.path.join(BASE_DIR, '2011A bill files', '2011A bill files', '2011ADONOTPAY.TXT'),
os.path.join(BASE_DIR, 'cleaned_data', '2011ADONOTPAY_cleaned.txt'))
]
def getRecordLayout(IFILE):
"""
Takes a pipe-delimited record layout from a text file
and parses it into a list of dictionaries that include
{'fieldname': <str fieldname>, 'length': <int field length>}
"""
with open(IFILE) as ifile:
rec_layout = []
for line in ifile.readlines():
data = line.strip('\n').split('|')
rec_layout.append({'field': data[0], 'length':int(data[1])})
return rec_layout
def main():
"""Gets each file and parses it using the record layout."""
rec_layout = getRecordLayout(REC_LAYOUT_FILE)
for IFILE, OFILE in FILES:
print "Parsing %s" % IFILE
with open(IFILE, 'r') as ifile, open(OFILE, 'w') as ofile:
for row in ifile.readlines()[1:]:
row = row.strip('\r\n')
start = 0
fields = OrderedDict()
for rec in rec_layout:
f = row[start:start+rec['length']].strip()
f = " ".join(f.split())
fields[rec['field']] = f
start += rec['length']
ofile.write('|'.join(fields.values())+'\n')
print "Finished %s" % IFILE
print
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment