-
-
Save laranea/34734dce7bd8ebd63d45 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import election_data | |
from match import match | |
''' | |
Match BBC constituency names with TheyWorkForYou constituency names, | |
using MP name and party as extra queues | |
''' | |
twfy_data = election_data.get_twfy_data() | |
twfy_fields = ('party', 'name', 'constituency') | |
bbc_data = election_data.get_bbc_data() | |
bbc_fields = ('party', 'mp', 'constituency') | |
guardian_data = election_data.get_guardian_data() | |
guardian_fields = ('party', 'mp', 'constituency') | |
# the guardian data is missing a constituency! Add a stub | |
guardian_data.append({ | |
'party': u'', | |
'mp': u'', | |
'constituency': u'', | |
}) | |
def blue(x): return '\033[94m%s\033[0m' % x | |
def yellow(x): return '\033[93m%s\033[0m' % x | |
# then perform the match | |
m = match(twfy_data, bbc_data, fields=zip(twfy_fields, bbc_fields)) | |
for x, y in m: | |
print('TWFY: "%s"\n BBC: "%s"\n' % (blue(x['constituency']), yellow(y['constituency']))) | |
m = match(twfy_data, guardian_data, fields=zip(twfy_fields, guardian_fields)) | |
for x, y in m: | |
print(' TWFY: "%s"\nGuardian: "%s"\n' % (blue(x['constituency']), yellow(y['constituency']))) | |
m = match(bbc_data, guardian_data, fields=zip(bbc_fields, guardian_fields)) | |
for x, y in m: | |
print(' BBC: "%s"\nGuardian: "%s"\n' % (blue(x['constituency']), yellow(y['constituency']))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json, urllib2 | |
''' | |
load MP and constituency data from the TheyWorkForYou API | |
''' | |
def get_twfy_data(): | |
# TheyWorkForYou API key goes here | |
twfy_api_key = '' | |
# Setting the date to 1 June 2010 gets data for all constituencies | |
twfy_mps_url = 'http://www.theyworkforyou.com/api/getMPs?date=2010-06-01&key=%s' % twfy_api_key | |
twfy_obj = urllib2.urlopen(twfy_mps_url) | |
return json.load(twfy_obj, 'latin-1') | |
''' | |
load BBC election 2010 data, via a scraperwiki scraper | |
''' | |
def get_bbc_data(): | |
bbc_scraper_url = 'https://api.scraperwiki.com/api/1.0/datastore/sqlite?format=jsondict&name=2010-general-election-results&query=select%20*%20from%20%60swdata%60' | |
try: | |
bbc_obj = urllib2.urlopen(bbc_scraper_url) | |
except urllib2.HTTPError: | |
print 'Problem fetching data from scraperwiki. Trying mirror...' | |
backup_url = 'http://f.cl.ly/items/2X1y0c0R2x2W1d390S2c/2010-general-election-results.json' | |
bbc_obj = urllib2.urlopen(backup_url) | |
election_results = json.load(bbc_obj) | |
# figure out which MP won in each of the BBC constituencies | |
election_winners = {} | |
max_votes = {} | |
for x in election_results: | |
# there are some blank entries for some reason | |
if x['candidate'] is None: | |
continue | |
seat = x['seat'] | |
if seat not in election_winners: | |
election_winners[seat] = {'candidates': []} | |
election_winners[seat]['candidates'].append((x['candidate'], x['party'])) | |
try: | |
votes = int(x['votes']) | |
except ValueError: | |
# this fixes some gaps in the scraped data | |
if x['candidate'] in ['Teresa Pearce', 'Anne McIntosh']: | |
votes = 50000 | |
else: | |
votes = -1 | |
if seat not in max_votes or votes > max_votes[seat]: | |
max_votes[seat] = votes | |
election_winners[seat].update({ | |
'mp': x['candidate'], | |
'party': x['party'], | |
'constituency': x['seat'], | |
}) | |
return election_winners.values() | |
''' | |
load Guardian election 2010 data, via google spreadsheet: | |
http://www.theguardian.com/news/datablog/2010/may/07/uk-election-results-data-candidates-seats#data | |
''' | |
def get_guardian_data(): | |
worksheet_fn = 'worksheet.json' | |
gdoc_key = '0AonYZs4MzlZbdGRMdXRfZ08wcW9fQzBKZXZJeG5aMmc' | |
worksheet_url = 'https://spreadsheets.google.com/feeds/list/%s/od7/public/values?alt=json' % gdoc_key | |
# load google spreadsheet a URL, with caching | |
def load_or_fetch(filename, url): | |
try: | |
with open(filename) as f: | |
raw = f.read() | |
except IOError: | |
raw = urllib2.urlopen(url).read() | |
with open(filename, 'w') as f: | |
f.write(raw) | |
return raw | |
worksheet_raw = load_or_fetch(worksheet_fn, worksheet_url) | |
worksheet_json = json.loads(worksheet_raw)['feed']['entry'] | |
# pull data from google spreadsheet json | |
all_candidates = [{k[4:]: v['$t'] for k, v in x.items() if k[:4] == 'gsx$'} for x in worksheet_json] | |
results = {} | |
# the candidate properties we're interested in | |
can_props = ['candidate', 'sittingmember', 'vote', 'vote_2', 'party'] | |
# mapping to custom naming convention | |
repl = {'seat': 'constituency', 'candidate': 'name', 'winner': 'party', 'vote_2': 'percent_vote'} | |
# the constituency properties we're interested in | |
con_props = ['candidate', 'winner', 'seat', 'change', 'turnout', 'majority', 'lab-ldswing', 'c-ldswing', 'lab-cswing', 'holder-winner'] | |
# more renaming | |
con_repl = repl.copy() | |
con_repl.update({'candidate': 'mp'}) | |
for x in all_candidates: | |
seat = x['seat'] | |
if seat not in results: | |
results[seat] = { | |
'candidates': [], | |
} | |
results[seat]['candidates'].append({ | |
(k, repl.get(k))[k in repl]: v for k, v in x.items() if k in can_props | |
}) | |
if x['elected'] == '*': | |
results[seat].update({ | |
(k, con_repl.get(k))[k in con_repl]: v for k, v in x.items() if k in con_props | |
}) | |
return results.values() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Levenshtein as edit | |
''' | |
Python module for solving assignment problems: | |
<http://en.wikipedia.org/wiki/Assignment_problem> | |
Example usage: | |
m = match.match(mps, election_winners, fields=[('party', 'party'), ('name', 'mp'), ('constituency', 'constituency')], algorithm='greedy') | |
''' | |
''' Compare similarity of two strings using Levenshtein distance. ''' | |
def cmp_strs(x, y): | |
return edit.ratio(x, y) | |
''' Compare similarity of two dicts using Levenshtein distance between | |
dict values. ''' | |
def cmp_dicts(x, y, fields): | |
if fields is None: | |
fields = zip(x.keys(), x.keys()) | |
ratios = [edit.ratio(x[f], y[g]) for f, g in fields] | |
return sum(ratios) / len(ratios) | |
''' Greedy algorithm for solving the problem. | |
Unlike Hungarian, greedy is not guaranteed to find the best solution, | |
but it is O(n) rather than polynomial and is fine in most instances. | |
''' | |
def compute_greedy(z): | |
from numpy import matrix | |
indexes = [] | |
num_items = len(z) | |
m = matrix(z) | |
blank_row = matrix([-1] * num_items) | |
blank_col = blank_row.transpose() | |
while True: | |
max_index = m.argmax() | |
score = m.item(max_index) | |
if score == -1: | |
break | |
l = (max_index / num_items, max_index % num_items) | |
m[l[0],:] = blank_row | |
m[:, l[1]] = blank_col | |
indexes.append(l) | |
return indexes | |
''' Given two lists of dicts that map 1:1, order them so they match up. | |
Returns a list of tuples. ''' | |
def match(x, y, fields=None, algorithm='greedy'): | |
if type(x[0]) is type(''): | |
z = [[cmp_strs(a, b) for b in y] for a in x] | |
else: | |
z = [[cmp_dicts(a, b, fields) for b in y] for a in x] | |
if algorithm == 'greedy': | |
indexes = compute_greedy(z) | |
elif algorithm == 'hungarian': | |
from munkres import Munkres | |
m = Munkres() | |
indexes = m.compute(z) | |
else: | |
raise Exception('Unknown algorithm: "%s".' % algorithm) | |
# matches are ordered by confidence, most confident first | |
return [(x[a], y[b]) for a, b in indexes] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment