Skip to content

Instantly share code, notes, and snippets.

@robla
Created June 17, 2018 21:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robla/7664d03372e6a80f1372869c09472b60 to your computer and use it in GitHub Desktop.
Save robla/7664d03372e6a80f1372869c09472b60 to your computer and use it in GitHub Desktop.
sfballotparse.py - parse sfelections.sfgov.org/results ballot image files
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2018 Rob Lanphier
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
Parse the ballot image files provided by the City of San Francisco:
https://sfelections.sfgov.org/results
"""
BALLOTIMAGE_FIELDSPEC = """fieldname,lastchar,length
Contest_Id,7,7
Pref_Voter_Id,16,9
Serial_Number,23,7
Tally_Type_Id,26,3
Precinct_Id,33,7
Vote_Rank,36,3
Candidate_Id,43,7
Over_Vote,44,1
Under_Vote,45,1
"""
MASTERLOOKUP_FIELDSPEC = """fieldname,lastchar,length
Record_Type,10,10
Id,17,7
Description,67,50
List_Order,74,7
Candidates_Contest_Id,81,7
Is_WriteIn,82,1
Is_Provisional,83,1
"""
import argparse
from collections import OrderedDict
from copy import deepcopy
import csv
import fileinput
from io import StringIO
import json
import re
import sys
import urllib.parse
def read_fieldspec(fieldspec_string):
"""
The image files and the master lookup (among others) provided in
SF elections are fixed-width text fields. This function:
1. ...reads the CSV file that specifies field names and the
position of the last character of each field.
2. ...build a regex which can be used to read the fixed width
fields
"""
# 1. read the csv spec into a list of dicts
list_of_rows = []
csvfile = StringIO(fieldspec_string)
reader = csv.DictReader(csvfile)
for row in reader:
list_of_rows.append(row)
# 2. build regex from list_of_rows
regex = r'^'
fieldnames = []
pos = 0
for field in list_of_rows:
lastchar = field['lastchar']
regex += r'(.{'
regex += str(int(lastchar) - pos)
regex += r'})'
pos = int(lastchar)
fieldnames.append(field['fieldname'])
if (int(lastchar) - pos) == int(field['length']):
raise ValueError(
"Length mismatch in {1}".format(field['fieldname']))
return(regex, fieldnames)
def read_data_file(fieldspec, datafile):
"""
This function uses a regex (created in read_fieldspec) to convert
each line of an SF election ballot image file into a Python
OrderedDict suitable for output as JSON or YAML.
"""
(regex, fields) = read_fieldspec(fieldspec)
for line in fileinput.input(datafile):
regmatch = re.match(regex, line)
if regmatch:
rowdict = OrderedDict()
for i, field in enumerate(fields):
rowdict[field] = regmatch.group(i + 1)
yield(rowdict)
else:
raise ValueError('generated regex does not match datafile')
def convert_to_ballots(imagelines, lookuplines, contestid=None):
"""
Each line of the ballot image file contains just one of many
possible candidate preferences expressed on a given ballot. For
example, in the 2018 SF Mayoral race, voters could choose up to 3
preferences for mayor. Each preference expressed would have its own
line in the image file. This function aggregates all of the
preferences expressed on a ballot into a single hierarchical data
structure, with one set of ballotfields per ballot, and many sets
of votefields (one set per candidate chosen)
"""
ballotfields = [
'Contest_Id',
'Pref_Voter_Id',
'Serial_Number',
'Tally_Type_Id',
'Precinct_Id'
]
# build up dict to look up candidate names from id
candpool = OrderedDict()
candpool['0000000'] = None
for lookupline in lookuplines:
if lookupline['Record_Type'].strip() == 'Candidate':
candpool[lookupline['Id']] = \
lookupline['Description'].strip()
if lookupline['Record_Type'].strip() == 'Contest':
# default contestid will be the first one listed
if not contestid:
contestid = lookupline['Id']
# create an empty ballot with proper field order to deepcopy when
# needed
emptyballot = OrderedDict()
for field in ballotfields:
emptyballot[field] = None
thisballot = deepcopy(emptyballot)
lastballot = thisballot
for imageline in imagelines:
# skip over all imagelines that aren't associated with the
# contestid passed in
if not imageline['Contest_Id'] == contestid:
continue
# each ballot may result in 3 image lines (one for each
# preference the voter marks). See if this line is the same
# voter/ballot as the previous line
if(thisballot['Pref_Voter_Id'] != imageline['Pref_Voter_Id']):
# if the Prev_Voter_Id doesn't line up, that means we're
# done with a ballot. yield it from this function, then
# start building a new ballot from this line.
if thisballot['Pref_Voter_Id'] != None:
yield(thisballot)
lastballot = thisballot
thisballot = deepcopy(emptyballot)
for field in ballotfields:
thisballot[field] = imageline[field]
thisballot['votes'] = []
# store the preference associated with this imageline in
# "thisvote"
thisvote = OrderedDict()
thisvote['rank'] = int(imageline['Vote_Rank'])
thisvote['candidate'] = candpool[imageline['Candidate_Id']]
overvote = (imageline['Over_Vote'] == '1')
undervote = (imageline['Under_Vote'] == '1')
if(overvote and undervote):
raise ValueError('both overvote and undervote flagged')
elif(overvote):
thisvote['exception'] = 'overvote'
elif(undervote):
thisvote['exception'] = 'undervote'
thisballot['votes'].append(thisvote)
# now that we're out of the loop, yield the last ballot
yield(thisballot)
def dump_url_encoded(outputrecords, outfh):
for rec in outputrecords:
outrec = {}
# populate the higher ranked duplicates take priority over
# the lower rank
outrec[rec['votes'][0]['candidate']] = rec['votes'][0]['rank']
if not rec['votes'][1]['candidate'] in outrec:
outrec[rec['votes'][1]['candidate']] = rec['votes'][1]['rank']
if not rec['votes'][2]['candidate'] in outrec:
outrec[rec['votes'][2]['candidate']] = rec['votes'][2]['rank']
print(urllib.parse.urlencode(outrec), file=outfh)
def main(argv=None):
# using splitlines to just get the first line
parser = argparse.ArgumentParser(description=__doc__.splitlines()[1])
parser.add_argument('--imagelines',
help='print records for imagelines',
action="store_true")
parser.add_argument('lookupfile',
help='master lookup file for this election')
parser.add_argument('imagefile', help='ballot image file')
parser.add_argument('-o', '--outfile', help='output file',
default=None)
parser.add_argument('--contestid', help='contest id; defaults to first found',
default=None)
parser.add_argument('--outputformat',
help='output format: json (default), urlencoded'
' (for Brian Olson\'s voteutil)', default="json")
args = parser.parse_args()
lookuplines = list(read_data_file(
MASTERLOOKUP_FIELDSPEC, args.lookupfile))
imagelines = read_data_file(BALLOTIMAGE_FIELDSPEC, args.imagefile)
if(args.imagelines):
# TODO: filter imagelines by args.contestid
outputrecords = imagelines
else:
outputrecords = convert_to_ballots(imagelines, lookuplines,
contestid=args.contestid)
if args.outfile:
outfh = open(args.outfile, 'w')
else:
outfh = sys.stdout
if args.outputformat == 'json':
try:
json.dump(outputrecords, outfh, indent=4)
except TypeError:
# convert generator to list
json.dump(list(outputrecords), outfh, indent=4)
elif args.outputformat == 'urlencoded':
dump_url_encoded(outputrecords, outfh)
else:
raise ValueError(
'args.outputformat {} not recognized'.format(args.outputformat))
if __name__ == '__main__':
exit_status = main(sys.argv)
sys.exit(exit_status)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment