Skip to content

Instantly share code, notes, and snippets.

@yaniv-aknin
Created June 26, 2016 10:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaniv-aknin/6ffba866990689f74d300faedb1a133a to your computer and use it in GitHub Desktop.
Save yaniv-aknin/6ffba866990689f74d300faedb1a133a to your computer and use it in GitHub Desktop.
Process UK Land Registry data
#!/usr/local/opt/playground/bin/python
"""
Helper tool to process the UK Land Registry data
https://www.gov.uk/government/publications/land-registry-data/public-data
"""
import numpy
import functools
import multiprocessing
import csv
import datetime
import collections
import subprocess
import sys
import argparse
import logging
class CONFIGURATION:
# a regex applied with an egrep coprocess to all input lines; this is a
# premature optimisiation trying to get less lines into Python
INPUT_REGEX = ''
# a mapping of column names to Python expressions. Every key will become
# an output column. The expression will be evaluated per record that
# passed the input_regex filtering above, and if the expression is True,
# this record will be included in the price median in this column.
COLUMN_EXPRESSIONS = collections.OrderedDict((
('UK', 'True'),
('LONDON', 'record.county == "GREATER LONDON"'),
('SW11', 'record.postcode1 == "SW11"'),
))
class Record(object):
__slots__ = ('guid price date postcode property_type new duration paon '
'saon street locality town district county ppd_type '
'record_status month postcode1 postcode2').split()
def __init__(self, row):
(self.guid, self.price, self.date, self.postcode, self.property_type,
self.new, self.duration, self.paon, self.saon, self.street,
self.locality, self.town, self.district, self.county, self.ppd_type,
self.record_status) = row
self.price = int(self.price)
self.date = datetime.datetime.strptime(
self.date, '%Y-%m-%d %H:%M').date()
self.month = datetime.date(
year=self.date.year, month=self.date.month, day=1)
self.postcode1, self.postcode2 = self.postcode.split()
def parse_arguments(argv):
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', action='store_true', default=False)
parser.add_argument('inputfiles', nargs='+')
options = parser.parse_args(argv[1:])
return options
def yield_lines(inputfile, expression):
pv = subprocess.Popen(['pv', '-petarcN', inputfile, inputfile],
stdout=subprocess.PIPE, shell=False)
grep = subprocess.Popen(['egrep', expression], stdin=pv.stdout,
stdout=subprocess.PIPE, shell=False)
for line in grep.stdout:
yield line
for process in (pv, grep):
if process.wait():
raise SystemExit('pv of %s failed' % inputfile)
def yield_records(filename, input_regex):
for index, row in enumerate(csv.reader(yield_lines(filename, input_regex))):
try:
yield Record(row)
except Exception, error:
logging.warning('%s:%s at %s:%s: %s',
type(error), error, filename, index, row)
def aggregate_monthly_prices(month_to_column_to_prices):
month_to_column_to_median = collections.defaultdict(dict)
for month, columns in month_to_column_to_prices.iteritems():
for column, prices in columns.iteritems():
month_to_column_to_median[month][column] = numpy.median(prices)
return month_to_column_to_median
def process_file(input_regex, column_expressions, filename):
month_to_column_to_prices = collections.defaultdict(
functools.partial(collections.defaultdict, list))
for record in yield_records(filename, input_regex):
for column_name, column_expression in column_expressions.iteritems():
if eval(column_expression, {'record': record}):
month_to_column_to_prices[
record.month][column_name].append(record.price)
return aggregate_monthly_prices(month_to_column_to_prices)
def main(options):
logging.basicConfig(
level=logging.WARNING if options.verbose else logging.ERROR)
pool = multiprocessing.Pool(len(options.inputfiles))
partial = functools.partial(process_file, CONFIGURATION.INPUT_REGEX,
CONFIGURATION.COLUMN_EXPRESSIONS)
month_to_column_to_median = collections.defaultdict(dict)
for result in pool.map(partial, options.inputfiles):
month_to_column_to_median.update(result)
columns = ['Date'] + CONFIGURATION.COLUMN_EXPRESSIONS.keys()
writer = csv.DictWriter(sys.stdout, columns)
writer.writeheader()
for month in sorted(month_to_column_to_median):
data = {column: int(median) for column, median in
month_to_column_to_median[month].iteritems()}
data['Date'] = month.strftime('%Y/%m/%d')
writer.writerow(data)
if __name__ == '__main__':
main(parse_arguments(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment