Last active
January 11, 2017 21:22
-
-
Save jakeemerson/30aac778c31b4d947e7666896efddc82 to your computer and use it in GitHub Desktop.
A (hopefully) complete listing of the functions required to run a ranked choice voting simulation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scipy.stats as ss | |
from scipy.stats import norm, beta, zscore, gamma | |
import numpy as np | |
from math import sqrt | |
from itertools import chain, permutations | |
# the functions below aim to replicate the R function here: | |
# http://stat.ethz.ch/R-manual/R-patched/library/stats/html/power.prop.test.html | |
def sample_power_probtest(p1, p2, power=0.8, sig=0.05): | |
z = norm.isf([sig / 2]) # two-sided t test | |
zp = -1 * norm.isf([power]) | |
d = (p1 - p2) | |
s = 2 * ((p1 + p2) / 2) * (1 - ((p1 + p2) / 2)) | |
n = s * ((zp + z) ** 2) / (d ** 2) | |
return int(round(n[0])) | |
def sample_power_difftest(d, s, power=0.8, sig=0.05): | |
z = norm.isf([sig / 2]) | |
zp = -1 * norm.isf([power]) | |
n = 2 * ((zp + z) ** 2) / (d ** 2) | |
return int(round(n[0])) | |
def ab(m, s): | |
"""convert mean and std to shape params for the beta distribution""" | |
a = ((1 - m) / (s ** 2) - 1 / m) * m ** 2 | |
b = a * (1 / m - 1) | |
return a, b | |
def gab(m, s): | |
""" | |
shape and scale of the gamma distribution | |
Args: | |
m: mean of the sample | |
s: standard deviation of the sample | |
Returns: | |
""" | |
a = (1.0 * m / s) ** 2 | |
b = (s ** 2) / (1.0 * m) | |
return a, b | |
def to_unity(values): | |
s = 1. * sum(values) | |
return [i / s for i in values] | |
def cohens_d(means=(), stdevs=(), N_samples=()): | |
""" | |
cohens_d giving the effect size between group 1 and group 2 | |
see http://en.wikipedia.org/wiki/Effect_size#Cohen.27s_d | |
:param means: tuple with (M1,M2) | |
:param stdevs: tuple with (V1,V2) | |
:param N_samples: tuple with (N1,N2) | |
:return: float, effect size | |
""" | |
m = means | |
s = stdevs | |
n = N_samples | |
S = sqrt(((n[0] - 1) * s[0] ** 2 + (n[1] - 1) * s[1] ** 2) / (n[0] + n[1] - 2)) | |
return (m[0] - m[1]) / S | |
def assess_sample_differences(sample1, sample2, power=None, sig=0.05): | |
z = gamma.isf | |
pass | |
def exhausted(ballot, eliminated): | |
""" | |
ballot is a string of letters, each letter corresponding to a candidate | |
eliminated is a string of letters, each corresponding to an eliminated candidate | |
this function returns true if the ballot does not rank any continuing candidate, | |
contains an overvote at the highest continuing ranking or contains 2 or more | |
sequential skipped rankings before its highest continuing ranking. | |
Exhausted ballots are not counted for any continuing candidate. | |
""" | |
is_exhausted = False | |
remaining_candidates = [item for item in ballot if item not in eliminated] | |
# does the ballot fail to rank any continuing candidate? | |
if len(remaining_candidates) == 0 or set(remaining_candidates) == {' '}: | |
is_exhausted = True | |
# does the ballot contain an overvote at the highest continuing ranking? | |
# WE CAN'T TEST THIS WITH THE STRING SCHEME SINCE THERE IS ONLY ONE POSITION PER RANK | |
# does the ballot contain 2 or more sequential skipped rankings before its | |
# highest continuing ranking? | |
if remaining_candidates[0:2] == [' ', ' ']: | |
is_exhausted = True | |
# get rid of any blanks | |
remaining_candidates = ''.join([item for item in remaining_candidates if item != ' ']) | |
return is_exhausted, remaining_candidates | |
class RankedRace: | |
def __init__(self, ballots): | |
""" | |
A RankedRace takes a list of ballots, where each ballot is a string of letters, | |
each letter corresponding to a candidate (or a blank space) in a rank position. | |
""" | |
self.ballots = ballots | |
self.candidates = set(chain.from_iterable(ballots)) - {' '} | |
self.to_eliminate = '' # all the candidates eliminated so far | |
self.eliminated = '' # candidate eliminated in the last round | |
self.results = {candidate: 0 for candidate in self.candidates} | |
for ballot in self.ballots: | |
is_exhausted, remaining_candidates = exhausted(ballot, self.to_eliminate) | |
if not is_exhausted: | |
# count the top candidate on the ballot | |
candidate = remaining_candidates[0] | |
self.results[candidate] += 1 | |
# this stores the results from the candidate who was eliminated in the prior round | |
self.intermediate = {} | |
self.round_number = 0 | |
self.total_ballots = len(ballots) | |
def run(self): | |
""" | |
Run one round of the race vote following the algorithm laid out in the proposed | |
legislation. A round eliminates any candidate with too few rank-1 votes to win. | |
""" | |
# if any one candidate has a majority, then don't continue | |
for candidate, votes in self.results.iteritems(): | |
if votes > self.total_ballots / 2.: | |
self.results = {candidate: votes} | |
return | |
# recalculate the results every time | |
self.results = {candidate: 0 for candidate in self.candidates if candidate not in self.to_eliminate} | |
self.intermediate = {} | |
for ballot in self.ballots: | |
is_exhausted, remaining_candidates = exhausted(ballot, self.to_eliminate) | |
if not is_exhausted: | |
# count the top candidate on the ballot | |
candidate = remaining_candidates[0] | |
self.results[candidate] += 1 | |
# how do the intermediate results get redistributed? | |
if self.to_eliminate not in ['', ' ']: | |
current_vote = remaining_candidates[0] | |
candidate_indices = {} | |
for idx, c in enumerate(ballot): | |
if c not in candidate_indices and c != ' ': | |
candidate_indices[c] = idx | |
current_idx = candidate_indices[current_vote] | |
try: | |
eliminated_idx = candidate_indices[self.eliminated] | |
except KeyError: | |
# if the eliminated candidate is not on this ballot ... | |
continue | |
try: | |
if self.eliminated == 'S' and ballot[0] == 'S': | |
stuff = 1 | |
if eliminated_idx < current_idx: | |
self.intermediate[current_vote] += 1 | |
except KeyError: | |
if self.eliminated == 'S' and ballot[0] == 'S': | |
stuff = 1 | |
self.intermediate[current_vote] = 1 | |
if self.eliminated == 'S': | |
stuff = 1 | |
# find the last place candidate | |
lowest_votes = min([v for c, v in self.results.iteritems()]) | |
lowest_candidate = '' | |
for candidate, votes in self.results.iteritems(): | |
if votes == lowest_votes: | |
lowest_candidate = candidate | |
self.results.pop(lowest_candidate) | |
self.to_eliminate += lowest_candidate | |
self.eliminated = lowest_candidate | |
self.round_number += 1 | |
def elect(self): | |
""" | |
Run all the required rounds to elect a winner following the proposed legislation. | |
""" | |
while len(self.results) > 1: | |
self.run() | |
return self.results | |
def order_prob(ballots): | |
candidates = list(set(chain.from_iterable(ballots)) - {' '}) | |
order_counts = {} | |
for perm_tuple in permutations(candidates, 2): | |
perm = ''.join(perm_tuple) | |
order_counts[perm] = 0 | |
for ballot in ballots: | |
# if the same candidates are in the permutation as the ballot | |
# same_candidates = ''.join([item for item in ballot if item in perm]) | |
# and if they are in the same order | |
# if same_candidates == perm: | |
# if the pair is a substring in the ballot, that is the second candidate immediately follows the first | |
if perm in ballot: | |
order_counts[perm] += 1 | |
# calculate the fraction of the total for each permutation | |
total = 1. * sum([v for k, v in order_counts.iteritems()]) | |
order_probs = {} | |
for k, v in order_counts.iteritems(): | |
order_probs[k] = round(v / total, 3) | |
return order_probs | |
def bootstrap(bs_function, list, niters=1000): | |
""" | |
bootstrap runs a bootstrapping algorithm on a generic function. Returns | |
an array of bootstrapped results of length [niters]. | |
bootstrap takes a function that takes a single parameter, which is a list. | |
The list is the second argument and it is randomly sampled from with | |
replacement [niters] times. If the function takes more than 1 argument, | |
it must be curried (see http://stackoverflow.com/a/1352865/347815) | |
beforehand to run this function. | |
The results are of length [niters] and consist of the result from each | |
bootstrapping iteration. The mean of these results should converge to the | |
true mean of the data, and the standard deviation of these results should | |
converge to the standard error of the data. | |
""" | |
results = [] | |
for i in range(niters): | |
# choose the data randomly | |
rdata = np.random.choice(list, size=1, replace=True, p=None) | |
res = bs_function(rdata) | |
results.append(res) | |
return results | |
def ibootstrap_sample(data, niters): | |
""" | |
Generates [nsamples] samples by selecting with replacement from [thelist], | |
returning a new list of the same size as the original on each iteration. | |
""" | |
array_list = np.array(data) | |
indices = np.random.randint(0, len(data), (len(data), niters)) | |
for si in range(niters): | |
yield array_list[indices[:, si]] | |
def simulation(candidates, order_distributions): | |
ballots = [] | |
all_candidates = [k for k in candidates] | |
# create a ballot for each vote in the candidates dictionary | |
# the 1st spot is taken by the given candidate, then the latter | |
# ranks are filled according to the preference probabilities | |
for candidate in candidates: | |
my_pair_prefs = {k[1]: dist for k, dist in order_distributions.iteritems() if k[0] == candidate} | |
others = my_pair_prefs.keys() | |
p = to_unity([my_pair_prefs[o].rvs() for o in others]) | |
for i in range(candidates[candidate]['votes']): | |
latter = np.random.choice(others, size=4, p=p) | |
ballot = [candidate] | |
ballot.extend(latter) | |
ballots.append(''.join(ballot)) | |
return ballots | |
def bootstrap(values): | |
random_samples = ibootstrap_sample(values, 1000) | |
order_samples = {k:[] for k in order_prob(values).keys()} | |
for sample in random_samples: | |
for k, v in order_prob(sample).iteritems(): | |
order_samples[k].append(v) | |
return order_samples | |
def make_distributions(order_samples, values): | |
order_distributions = {k:None for k in order_prob(values).keys()} | |
for k, v in order_samples.iteritems(): | |
pair_probabilities = np.array(v) | |
expected = pair_probabilities.mean() | |
stdev = pair_probabilities.std() | |
order_distributions[k] = ss.beta(*ab(expected, stdev)) | |
return order_distributions | |
if __name__ == '__main__': | |
import pandas as pd | |
col_names = [u'Timestamp', | |
u'Username', | |
u'2010_1', | |
u'2010_2', | |
u'2010_3', | |
u'2010_4', | |
u'2010_5', | |
u'2014_1', | |
u'2014_2', | |
u'2014_3', | |
u'affiliation', | |
u'follow_up'] | |
df = pd.read_csv('../data/processed/Responses_to_RCV_form_no_email.csv', sep='|', names=col_names, header=0) | |
candidate_dict = {'Libby Mitchell (Democrat)': 'M', | |
'Eliot Cutler (Independent)': 'C', | |
'Paul LePage (Republican)': 'L', | |
'Kevin Scott (Independent)': 'S', | |
'Shawn Moody (Independent)': 'D'} | |
def summarize_ranks_2010(row): | |
""" | |
Take the list of candidates names and return a string of letters | |
E.g., 'LCMSD' | |
""" | |
candidate_dict = {'Libby Mitchell (Democrat)': 'M', | |
'Eliot Cutler (Independent)': 'C', | |
'Paul LePage (Republican)': 'L', | |
'Kevin Scott (Independent)': 'S', | |
'Shawn Moody (Independent)': 'D'} | |
cols = [] | |
for i in range(1, 6): | |
cols.append('2010_{}'.format(str(i))) | |
summary = [] | |
for c in cols: | |
try: | |
abbr = candidate_dict[row[c]] | |
except KeyError: | |
abbr = ' ' | |
summary.append(abbr) | |
return ''.join(summary) | |
df['2010_ranks'] = df.apply(summarize_ranks_2010, axis=1) | |
order_samples_2010 = bootstrap(df['2010_ranks'].values) | |
order_distributions_2010 = make_distributions(order_samples_2010, df['2010_ranks'].values) | |
candidates_2010 = {'L': {'name': 'LePage', 'votes': 218605}, | |
'M': {'name': 'Mitchell', 'votes': 109387}, | |
'C': {'name': 'Cutler', 'votes': 208270}, | |
'D': {'name': 'Moody', 'votes': 28756}, | |
'S': {'name': 'Scott', 'votes': 5664}, | |
} | |
ballots = simulation(candidates_2010, order_distributions_2010) | |
r = RankedRace(ballots) | |
links = [['source','target','value']] | |
while len(r.results) > 1: | |
print '---Round {} results---'.format(r.round_number+1) | |
for c in r.results: | |
links.append([ | |
'{0}{1}'.format(candidates_2010[c]['name'], r.round_number), | |
'{0}{1}'.format(candidates_2010[c]['name'], r.round_number+1), | |
r.results[c] | |
]) | |
print '{0}: {1:,}'.format(candidates_2010[c]['name'], r.results[c]) | |
elim = r.eliminated | |
r.run() | |
try: | |
print '{0} was eliminated in round {1}. Votes were re-allocated as follows:'.format(candidates_2010[elim]['name'], r.round_number-1) | |
for a in r.intermediate: | |
links.append([ | |
'{0}{1}'.format(candidates_2010[elim]['name'], r.round_number-1), | |
'{0}{1}'.format(candidates_2010[a]['name'], r.round_number), | |
r.intermediate[a] | |
]) | |
print '\t{0}: {1:,}'.format(candidates_2010[a]['name'], r.intermediate[a]) | |
except KeyError: | |
continue | |
winner, abbr = [[k, v] for k, v in candidate_dict.iteritems() if v in r.results.keys()][0] | |
votes = r.results[abbr] | |
print 'The winner is {0} with {1:,} votes.'.format(winner, votes) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is what the output looks like...
`
---Round 1 results---
Cutler: 208,270
Mitchell: 109,387
Scott: 5,664
Moody: 28,756
LePage: 218,605
---Round 2 results---
Cutler: 208,270
Mitchell: 109,387
Moody: 28,756
LePage: 218,605
Scott was eliminated in round 1. Votes were re-allocated as follows:
Cutler: 412
Mitchell: 463
Moody: 2,107
LePage: 2,682
---Round 3 results---
Cutler: 208,682
Mitchell: 109,850
LePage: 221,287
Moody was eliminated in round 2. Votes were re-allocated as follows:
Cutler: 6,439
Mitchell: 6,067
LePage: 16,087
---Round 4 results---
Cutler: 215,121
LePage: 237,374
Mitchell was eliminated in round 3. Votes were re-allocated as follows:
Cutler: 89,004
LePage: 16,825
The winner is Eliot Cutler (Independent) with 304,125 votes.
`