Created
May 12, 2013 07:40
-
-
Save anonymous/5562759 to your computer and use it in GitHub Desktop.
This is the source code for the original Klimek simulations. This code is very unoptimized to aid the reader understand what is going on. Originally Numpy wasn't going to be used, but it turns out Numpy has a very good function to draw from a binomial distribution. This could not be replicated without Numpy, so it was included. The rest of the c…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author notes: I tried my best to not use numpy or scipy, in attempt to keep things simple and understandable | |
# However, numpy does provide the very excellent binomial function, of which Python doesn't come with | |
# Whilst the rest of the code could be written using Numpy in a far more efficient manner | |
# It's been kept simple for those unfamiliar. | |
# All you need to know is binomial() performs a binomial trial with a probability | |
from __future__ import division | |
from numpy.random import binomial | |
import csv | |
import sys | |
import os | |
import math | |
import random | |
import bisect | |
winningRatioPeakDict = { | |
'SPR.csv' : { | |
'low': 0.35 , | |
'mid': 0.45 , | |
'high': 0.55, | |
'mean': 0.4919155, | |
'median': 0.5013012, | |
}, | |
'SPR_P_only.csv': { | |
'low': 0.45 , | |
'mid': 0.55 , | |
'high': 0.65, | |
'mean': 0.4951695, | |
'median': 0.4816654 | |
}, | |
'SPR_N_only.csv':{ | |
'low': 0.35 , | |
'mid': 0.45 , | |
'high': 0.55, | |
'mean': 0.4878147, | |
'median': 0.496434 | |
} | |
} | |
turnoutRatioPeakDict = { | |
'SPR.csv': { | |
'low': 0.85, | |
'mid': 0.87, | |
'high': 0.89, | |
'mean': 0.8455734, | |
'median': 0.8535894 | |
}, | |
'SPR_P_only.csv': { | |
'low': 0.775, | |
'mid': 0.825, | |
'high': 0.875, | |
'mean': 0.8354881, | |
'median': 0.8477099 | |
}, | |
'SPR_N_only.csv': { | |
'low': 0.85, | |
'mid': 0.87, | |
'high': 0.89, | |
'mean': 0.85, | |
'median': 0.8558019 | |
}, | |
} | |
try: | |
WINNING_RATIO_PEAK = winningRatioPeakDict[sys.argv[1]][sys.argv[3]] # argv[1] is filename, argv[3] takes 'high', 'low', 'mid', 'mean', 'median' | |
except: | |
WINNING_RATIO_PEAK = winningRatioPeakDict['SPR.csv']['mean'] | |
try: | |
TURNOUT_RATIO_PEAK = turnoutRatioPeakDict[sys.argv[1]][sys.argv[3]] | |
except: | |
TURNOUT_RATIO_PEAK = turnoutRatioPeakDict['SPR.csv']['mean'] | |
try: | |
FI = float(sys.argv[4]) | |
FE = float(sys.argv[5]) | |
except: | |
FI = 0 | |
FE = 0 | |
def readFile(): | |
f = open(sys.argv[1]) | |
electorates = [] | |
reader = csv.DictReader(f) | |
for row in reader: | |
d = {} | |
# manual coercion | |
for k,v in row.items(): | |
try: | |
d[k] = float(v) | |
except ValueError: | |
d[k] = v | |
electorates.append(d) | |
return electorates | |
def getLeftSD(electorates, peak=WINNING_RATIO_PEAK): | |
# get all electorates left of the peak | |
newList = [] | |
for electorate in electorates: | |
if electorate['WinningRatio'] < peak: | |
newList.append(electorate) | |
totalVoters = sum([e['RegisteredVoters'] for e in newList]) | |
expectedWinningRatio = peak # mean can be used as well | |
vi = [e['WinningRatio'] for e in newList] | |
leftVariance = map(lambda x: (x - expectedWinningRatio) ** 2, vi) | |
try: | |
leftSD = math.sqrt((sum(leftVariance)/len(leftVariance))) | |
except ZeroDivisionError: | |
return 0 # hack! | |
return leftSD | |
def getRightSD(electorates, peak=WINNING_RATIO_PEAK): | |
# get all electorates right of the peak | |
newList = [] | |
for electorate in electorates: | |
if electorate['WinningRatio'] > peak: | |
newList.append(electorate) | |
totalVoters = sum([e['RegisteredVoters'] for e in newList]) | |
expectedWinningRatio = peak # mean can be used as well | |
vi = [e['WinningRatio'] for e in newList] | |
rightVariance = map(lambda x: (x - expectedWinningRatio) ** 2, vi) | |
rightSD = math.sqrt((sum(rightVariance)/len(rightVariance))) | |
return rightSD | |
def getTurnoutSD(electorates, peak = TURNOUT_RATIO_PEAK): | |
a = [e['TurnoutRatio'] for e in electorates] | |
turnoutVariance = map(lambda x: (x - peak) ** 2, a) | |
turnoutSD = math.sqrt((sum(turnoutVariance)/len(turnoutVariance))) | |
return turnoutSD | |
def incrementalFraud(rightSD, turnOut, registeredVoters, votedForWinning): | |
fraudIntensity = 0 | |
while fraudIntensity <= 0 or fraudIntensity > 1: | |
fraudIntensity = random.gauss(0, rightSD) | |
fraudulentVoteCount = registeredVoters * (turnOut * votedForWinning + fraudIntensity * (1 - turnOut) + fraudIntensity * (1 - votedForWinning) * turnOut) | |
return int(fraudulentVoteCount) | |
def extremeFraud(registeredVoters, votedForWinning): | |
fraudIntensity = 0 | |
while fraudIntensity <= 0 or fraudIntensity > 1: | |
fraudIntensity = 1 - random.gauss(1, 0.075) # klimek et al discovered 0.075 describes all well | |
oppositionVotes = registeredVoters - votedForWinning | |
additionalFraudVotes = binomial(n=oppositionVotes, p=fraudIntensity) | |
return int(additionalFraudVotes) | |
def simulate(electorates, fi=FI, fe=FE, turnoutPeak = TURNOUT_RATIO_PEAK, winningPeak=WINNING_RATIO_PEAK): | |
random.seed(os.urandom(16)) | |
turnoutSD = getTurnoutSD(electorates) | |
leftSD = getLeftSD(electorates) | |
rightSD = getRightSD(electorates) | |
simulatedElectorates = [] | |
for electorate in electorates: | |
simulatedElectorate = {} | |
originalVotedForWinning = electorate['VotedForWinning'] | |
registeredVoters = electorate['RegisteredVoters'] | |
turnoutRatio = random.gauss(turnoutPeak, turnoutSD) | |
winningRatio = random.gauss(winningPeak, leftSD) | |
willIncrementalFraudHappen = binomial(n=1, p=fi) | |
willExtremeFraudHappen = binomial(n=1, p=fe) | |
newVotedForWinning = originalVotedForWinning | |
if willIncrementalFraudHappen: # binomial returns a 1 | |
newVotedForWinning = incrementalFraud(rightSD, turnoutRatio, registeredVoters, winningRatio) | |
if willExtremeFraudHappen: | |
additionalFraudVotes = extremeFraud(registeredVoters, winningRatio) | |
newVotedForWinning += additionalFraudVotes | |
simulatedElectorate = { | |
'SEAT': electorate['SEAT'], | |
'RegisteredVoters': registeredVoters, | |
'VoterTurnout': int(turnoutRatio * registeredVoters), # can't expect 0.5 of a person | |
'TurnoutRatio': turnoutRatio, | |
'VotedForWinning': newVotedForWinning, | |
'WinningRatio': newVotedForWinning/registeredVoters, | |
} | |
try: | |
simulatedElectorate['VoteRate (Rescaled)'] = math.log(math.sqrt((registeredVoters - newVotedForWinning) ** 2)/newVotedForWinning) | |
except: | |
simulatedElectorate['VoteRate (Rescaled)'] = "ERROR" # yet another too tired to find out why hack | |
simulatedElectorates.append(simulatedElectorate) | |
return simulatedElectorates | |
if __name__ == '__main__': | |
electorates = readFile() | |
simulatedElectorates = simulate(electorates) | |
fieldNames = simulatedElectorates[0].keys() | |
g = open(sys.argv[2], 'w') | |
writer = csv.DictWriter(g, fieldnames=fieldNames) | |
writer.writerow(dict(zip(fieldNames, fieldNames))) | |
for electorate in simulatedElectorates: | |
writer.writerow(electorate) | |
g.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment