Skip to content

Instantly share code, notes, and snippets.

@jakeemerson
Last active January 11, 2017 21:22
Show Gist options
  • Save jakeemerson/30aac778c31b4d947e7666896efddc82 to your computer and use it in GitHub Desktop.
Save jakeemerson/30aac778c31b4d947e7666896efddc82 to your computer and use it in GitHub Desktop.
A (hopefully) complete listing of the functions required to run a ranked choice voting simulation
import scipy.stats as ss
from scipy.stats import norm, beta, zscore, gamma
import numpy as np
from math import sqrt
from itertools import chain, permutations
# the functions below aim to replicate the R function here:
# http://stat.ethz.ch/R-manual/R-patched/library/stats/html/power.prop.test.html
def sample_power_probtest(p1, p2, power=0.8, sig=0.05):
z = norm.isf([sig / 2]) # two-sided t test
zp = -1 * norm.isf([power])
d = (p1 - p2)
s = 2 * ((p1 + p2) / 2) * (1 - ((p1 + p2) / 2))
n = s * ((zp + z) ** 2) / (d ** 2)
return int(round(n[0]))
def sample_power_difftest(d, s, power=0.8, sig=0.05):
z = norm.isf([sig / 2])
zp = -1 * norm.isf([power])
n = 2 * ((zp + z) ** 2) / (d ** 2)
return int(round(n[0]))
def ab(m, s):
"""convert mean and std to shape params for the beta distribution"""
a = ((1 - m) / (s ** 2) - 1 / m) * m ** 2
b = a * (1 / m - 1)
return a, b
def gab(m, s):
"""
shape and scale of the gamma distribution
Args:
m: mean of the sample
s: standard deviation of the sample
Returns:
"""
a = (1.0 * m / s) ** 2
b = (s ** 2) / (1.0 * m)
return a, b
def to_unity(values):
s = 1. * sum(values)
return [i / s for i in values]
def cohens_d(means=(), stdevs=(), N_samples=()):
"""
cohens_d giving the effect size between group 1 and group 2
see http://en.wikipedia.org/wiki/Effect_size#Cohen.27s_d
:param means: tuple with (M1,M2)
:param stdevs: tuple with (V1,V2)
:param N_samples: tuple with (N1,N2)
:return: float, effect size
"""
m = means
s = stdevs
n = N_samples
S = sqrt(((n[0] - 1) * s[0] ** 2 + (n[1] - 1) * s[1] ** 2) / (n[0] + n[1] - 2))
return (m[0] - m[1]) / S
def assess_sample_differences(sample1, sample2, power=None, sig=0.05):
z = gamma.isf
pass
def exhausted(ballot, eliminated):
"""
ballot is a string of letters, each letter corresponding to a candidate
eliminated is a string of letters, each corresponding to an eliminated candidate
this function returns true if the ballot does not rank any continuing candidate,
contains an overvote at the highest continuing ranking or contains 2 or more
sequential skipped rankings before its highest continuing ranking.
Exhausted ballots are not counted for any continuing candidate.
"""
is_exhausted = False
remaining_candidates = [item for item in ballot if item not in eliminated]
# does the ballot fail to rank any continuing candidate?
if len(remaining_candidates) == 0 or set(remaining_candidates) == {' '}:
is_exhausted = True
# does the ballot contain an overvote at the highest continuing ranking?
# WE CAN'T TEST THIS WITH THE STRING SCHEME SINCE THERE IS ONLY ONE POSITION PER RANK
# does the ballot contain 2 or more sequential skipped rankings before its
# highest continuing ranking?
if remaining_candidates[0:2] == [' ', ' ']:
is_exhausted = True
# get rid of any blanks
remaining_candidates = ''.join([item for item in remaining_candidates if item != ' '])
return is_exhausted, remaining_candidates
class RankedRace:
def __init__(self, ballots):
"""
A RankedRace takes a list of ballots, where each ballot is a string of letters,
each letter corresponding to a candidate (or a blank space) in a rank position.
"""
self.ballots = ballots
self.candidates = set(chain.from_iterable(ballots)) - {' '}
self.to_eliminate = '' # all the candidates eliminated so far
self.eliminated = '' # candidate eliminated in the last round
self.results = {candidate: 0 for candidate in self.candidates}
for ballot in self.ballots:
is_exhausted, remaining_candidates = exhausted(ballot, self.to_eliminate)
if not is_exhausted:
# count the top candidate on the ballot
candidate = remaining_candidates[0]
self.results[candidate] += 1
# this stores the results from the candidate who was eliminated in the prior round
self.intermediate = {}
self.round_number = 0
self.total_ballots = len(ballots)
def run(self):
"""
Run one round of the race vote following the algorithm laid out in the proposed
legislation. A round eliminates any candidate with too few rank-1 votes to win.
"""
# if any one candidate has a majority, then don't continue
for candidate, votes in self.results.iteritems():
if votes > self.total_ballots / 2.:
self.results = {candidate: votes}
return
# recalculate the results every time
self.results = {candidate: 0 for candidate in self.candidates if candidate not in self.to_eliminate}
self.intermediate = {}
for ballot in self.ballots:
is_exhausted, remaining_candidates = exhausted(ballot, self.to_eliminate)
if not is_exhausted:
# count the top candidate on the ballot
candidate = remaining_candidates[0]
self.results[candidate] += 1
# how do the intermediate results get redistributed?
if self.to_eliminate not in ['', ' ']:
current_vote = remaining_candidates[0]
candidate_indices = {}
for idx, c in enumerate(ballot):
if c not in candidate_indices and c != ' ':
candidate_indices[c] = idx
current_idx = candidate_indices[current_vote]
try:
eliminated_idx = candidate_indices[self.eliminated]
except KeyError:
# if the eliminated candidate is not on this ballot ...
continue
try:
if self.eliminated == 'S' and ballot[0] == 'S':
stuff = 1
if eliminated_idx < current_idx:
self.intermediate[current_vote] += 1
except KeyError:
if self.eliminated == 'S' and ballot[0] == 'S':
stuff = 1
self.intermediate[current_vote] = 1
if self.eliminated == 'S':
stuff = 1
# find the last place candidate
lowest_votes = min([v for c, v in self.results.iteritems()])
lowest_candidate = ''
for candidate, votes in self.results.iteritems():
if votes == lowest_votes:
lowest_candidate = candidate
self.results.pop(lowest_candidate)
self.to_eliminate += lowest_candidate
self.eliminated = lowest_candidate
self.round_number += 1
def elect(self):
"""
Run all the required rounds to elect a winner following the proposed legislation.
"""
while len(self.results) > 1:
self.run()
return self.results
def order_prob(ballots):
candidates = list(set(chain.from_iterable(ballots)) - {' '})
order_counts = {}
for perm_tuple in permutations(candidates, 2):
perm = ''.join(perm_tuple)
order_counts[perm] = 0
for ballot in ballots:
# if the same candidates are in the permutation as the ballot
# same_candidates = ''.join([item for item in ballot if item in perm])
# and if they are in the same order
# if same_candidates == perm:
# if the pair is a substring in the ballot, that is the second candidate immediately follows the first
if perm in ballot:
order_counts[perm] += 1
# calculate the fraction of the total for each permutation
total = 1. * sum([v for k, v in order_counts.iteritems()])
order_probs = {}
for k, v in order_counts.iteritems():
order_probs[k] = round(v / total, 3)
return order_probs
def bootstrap(bs_function, list, niters=1000):
"""
bootstrap runs a bootstrapping algorithm on a generic function. Returns
an array of bootstrapped results of length [niters].
bootstrap takes a function that takes a single parameter, which is a list.
The list is the second argument and it is randomly sampled from with
replacement [niters] times. If the function takes more than 1 argument,
it must be curried (see http://stackoverflow.com/a/1352865/347815)
beforehand to run this function.
The results are of length [niters] and consist of the result from each
bootstrapping iteration. The mean of these results should converge to the
true mean of the data, and the standard deviation of these results should
converge to the standard error of the data.
"""
results = []
for i in range(niters):
# choose the data randomly
rdata = np.random.choice(list, size=1, replace=True, p=None)
res = bs_function(rdata)
results.append(res)
return results
def ibootstrap_sample(data, niters):
"""
Generates [nsamples] samples by selecting with replacement from [thelist],
returning a new list of the same size as the original on each iteration.
"""
array_list = np.array(data)
indices = np.random.randint(0, len(data), (len(data), niters))
for si in range(niters):
yield array_list[indices[:, si]]
def simulation(candidates, order_distributions):
ballots = []
all_candidates = [k for k in candidates]
# create a ballot for each vote in the candidates dictionary
# the 1st spot is taken by the given candidate, then the latter
# ranks are filled according to the preference probabilities
for candidate in candidates:
my_pair_prefs = {k[1]: dist for k, dist in order_distributions.iteritems() if k[0] == candidate}
others = my_pair_prefs.keys()
p = to_unity([my_pair_prefs[o].rvs() for o in others])
for i in range(candidates[candidate]['votes']):
latter = np.random.choice(others, size=4, p=p)
ballot = [candidate]
ballot.extend(latter)
ballots.append(''.join(ballot))
return ballots
def bootstrap(values):
random_samples = ibootstrap_sample(values, 1000)
order_samples = {k:[] for k in order_prob(values).keys()}
for sample in random_samples:
for k, v in order_prob(sample).iteritems():
order_samples[k].append(v)
return order_samples
def make_distributions(order_samples, values):
order_distributions = {k:None for k in order_prob(values).keys()}
for k, v in order_samples.iteritems():
pair_probabilities = np.array(v)
expected = pair_probabilities.mean()
stdev = pair_probabilities.std()
order_distributions[k] = ss.beta(*ab(expected, stdev))
return order_distributions
if __name__ == '__main__':
import pandas as pd
col_names = [u'Timestamp',
u'Username',
u'2010_1',
u'2010_2',
u'2010_3',
u'2010_4',
u'2010_5',
u'2014_1',
u'2014_2',
u'2014_3',
u'affiliation',
u'follow_up']
df = pd.read_csv('../data/processed/Responses_to_RCV_form_no_email.csv', sep='|', names=col_names, header=0)
candidate_dict = {'Libby Mitchell (Democrat)': 'M',
'Eliot Cutler (Independent)': 'C',
'Paul LePage (Republican)': 'L',
'Kevin Scott (Independent)': 'S',
'Shawn Moody (Independent)': 'D'}
def summarize_ranks_2010(row):
"""
Take the list of candidates names and return a string of letters
E.g., 'LCMSD'
"""
candidate_dict = {'Libby Mitchell (Democrat)': 'M',
'Eliot Cutler (Independent)': 'C',
'Paul LePage (Republican)': 'L',
'Kevin Scott (Independent)': 'S',
'Shawn Moody (Independent)': 'D'}
cols = []
for i in range(1, 6):
cols.append('2010_{}'.format(str(i)))
summary = []
for c in cols:
try:
abbr = candidate_dict[row[c]]
except KeyError:
abbr = ' '
summary.append(abbr)
return ''.join(summary)
df['2010_ranks'] = df.apply(summarize_ranks_2010, axis=1)
order_samples_2010 = bootstrap(df['2010_ranks'].values)
order_distributions_2010 = make_distributions(order_samples_2010, df['2010_ranks'].values)
candidates_2010 = {'L': {'name': 'LePage', 'votes': 218605},
'M': {'name': 'Mitchell', 'votes': 109387},
'C': {'name': 'Cutler', 'votes': 208270},
'D': {'name': 'Moody', 'votes': 28756},
'S': {'name': 'Scott', 'votes': 5664},
}
ballots = simulation(candidates_2010, order_distributions_2010)
r = RankedRace(ballots)
links = [['source','target','value']]
while len(r.results) > 1:
print
print '---Round {} results---'.format(r.round_number+1)
for c in r.results:
links.append([
'{0}{1}'.format(candidates_2010[c]['name'], r.round_number),
'{0}{1}'.format(candidates_2010[c]['name'], r.round_number+1),
r.results[c]
])
print '{0}: {1:,}'.format(candidates_2010[c]['name'], r.results[c])
elim = r.eliminated
r.run()
try:
print '{0} was eliminated in round {1}. Votes were re-allocated as follows:'.format(candidates_2010[elim]['name'], r.round_number-1)
for a in r.intermediate:
links.append([
'{0}{1}'.format(candidates_2010[elim]['name'], r.round_number-1),
'{0}{1}'.format(candidates_2010[a]['name'], r.round_number),
r.intermediate[a]
])
print '\t{0}: {1:,}'.format(candidates_2010[a]['name'], r.intermediate[a])
except KeyError:
continue
winner, abbr = [[k, v] for k, v in candidate_dict.iteritems() if v in r.results.keys()][0]
votes = r.results[abbr]
print
print 'The winner is {0} with {1:,} votes.'.format(winner, votes)
@jakeemerson
Copy link
Author

This is what the output looks like...

`
---Round 1 results---
Cutler: 208,270
Mitchell: 109,387
Scott: 5,664
Moody: 28,756
LePage: 218,605

---Round 2 results---
Cutler: 208,270
Mitchell: 109,387
Moody: 28,756
LePage: 218,605
Scott was eliminated in round 1. Votes were re-allocated as follows:
Cutler: 412
Mitchell: 463
Moody: 2,107
LePage: 2,682

---Round 3 results---
Cutler: 208,682
Mitchell: 109,850
LePage: 221,287
Moody was eliminated in round 2. Votes were re-allocated as follows:
Cutler: 6,439
Mitchell: 6,067
LePage: 16,087

---Round 4 results---
Cutler: 215,121
LePage: 237,374
Mitchell was eliminated in round 3. Votes were re-allocated as follows:
Cutler: 89,004
LePage: 16,825

The winner is Eliot Cutler (Independent) with 304,125 votes.
`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment