Created
June 26, 2016 10:07
-
-
Save yaniv-aknin/6ffba866990689f74d300faedb1a133a to your computer and use it in GitHub Desktop.
Process UK Land Registry data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/local/opt/playground/bin/python | |
""" | |
Helper tool to process the UK Land Registry data | |
https://www.gov.uk/government/publications/land-registry-data/public-data | |
""" | |
import numpy | |
import functools | |
import multiprocessing | |
import csv | |
import datetime | |
import collections | |
import subprocess | |
import sys | |
import argparse | |
import logging | |
class CONFIGURATION: | |
# a regex applied with an egrep coprocess to all input lines; this is a | |
# premature optimisiation trying to get less lines into Python | |
INPUT_REGEX = '' | |
# a mapping of column names to Python expressions. Every key will become | |
# an output column. The expression will be evaluated per record that | |
# passed the input_regex filtering above, and if the expression is True, | |
# this record will be included in the price median in this column. | |
COLUMN_EXPRESSIONS = collections.OrderedDict(( | |
('UK', 'True'), | |
('LONDON', 'record.county == "GREATER LONDON"'), | |
('SW11', 'record.postcode1 == "SW11"'), | |
)) | |
class Record(object): | |
__slots__ = ('guid price date postcode property_type new duration paon ' | |
'saon street locality town district county ppd_type ' | |
'record_status month postcode1 postcode2').split() | |
def __init__(self, row): | |
(self.guid, self.price, self.date, self.postcode, self.property_type, | |
self.new, self.duration, self.paon, self.saon, self.street, | |
self.locality, self.town, self.district, self.county, self.ppd_type, | |
self.record_status) = row | |
self.price = int(self.price) | |
self.date = datetime.datetime.strptime( | |
self.date, '%Y-%m-%d %H:%M').date() | |
self.month = datetime.date( | |
year=self.date.year, month=self.date.month, day=1) | |
self.postcode1, self.postcode2 = self.postcode.split() | |
def parse_arguments(argv): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-v', '--verbose', action='store_true', default=False) | |
parser.add_argument('inputfiles', nargs='+') | |
options = parser.parse_args(argv[1:]) | |
return options | |
def yield_lines(inputfile, expression): | |
pv = subprocess.Popen(['pv', '-petarcN', inputfile, inputfile], | |
stdout=subprocess.PIPE, shell=False) | |
grep = subprocess.Popen(['egrep', expression], stdin=pv.stdout, | |
stdout=subprocess.PIPE, shell=False) | |
for line in grep.stdout: | |
yield line | |
for process in (pv, grep): | |
if process.wait(): | |
raise SystemExit('pv of %s failed' % inputfile) | |
def yield_records(filename, input_regex): | |
for index, row in enumerate(csv.reader(yield_lines(filename, input_regex))): | |
try: | |
yield Record(row) | |
except Exception, error: | |
logging.warning('%s:%s at %s:%s: %s', | |
type(error), error, filename, index, row) | |
def aggregate_monthly_prices(month_to_column_to_prices): | |
month_to_column_to_median = collections.defaultdict(dict) | |
for month, columns in month_to_column_to_prices.iteritems(): | |
for column, prices in columns.iteritems(): | |
month_to_column_to_median[month][column] = numpy.median(prices) | |
return month_to_column_to_median | |
def process_file(input_regex, column_expressions, filename): | |
month_to_column_to_prices = collections.defaultdict( | |
functools.partial(collections.defaultdict, list)) | |
for record in yield_records(filename, input_regex): | |
for column_name, column_expression in column_expressions.iteritems(): | |
if eval(column_expression, {'record': record}): | |
month_to_column_to_prices[ | |
record.month][column_name].append(record.price) | |
return aggregate_monthly_prices(month_to_column_to_prices) | |
def main(options): | |
logging.basicConfig( | |
level=logging.WARNING if options.verbose else logging.ERROR) | |
pool = multiprocessing.Pool(len(options.inputfiles)) | |
partial = functools.partial(process_file, CONFIGURATION.INPUT_REGEX, | |
CONFIGURATION.COLUMN_EXPRESSIONS) | |
month_to_column_to_median = collections.defaultdict(dict) | |
for result in pool.map(partial, options.inputfiles): | |
month_to_column_to_median.update(result) | |
columns = ['Date'] + CONFIGURATION.COLUMN_EXPRESSIONS.keys() | |
writer = csv.DictWriter(sys.stdout, columns) | |
writer.writeheader() | |
for month in sorted(month_to_column_to_median): | |
data = {column: int(median) for column, median in | |
month_to_column_to_median[month].iteritems()} | |
data['Date'] = month.strftime('%Y/%m/%d') | |
writer.writerow(data) | |
if __name__ == '__main__': | |
main(parse_arguments(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment