Skip to content

Instantly share code, notes, and snippets.

@DominikPeters
Created January 29, 2023 12:55
Show Gist options
  • Save DominikPeters/2208ca4c7c1464bc1d3956829195f20a to your computer and use it in GitHub Desktop.
Save DominikPeters/2208ca4c7c1464bc1d3956829195f20a to your computer and use it in GitHub Desktop.
Simulation code for the likelihood of ties when computing the Method of Equal Shares, on approval data from Pabulib
import csv
import os
from tqdm import *
import random
from fractions import Fraction
import math
def parse(filename):
with open(filename, 'r', newline='', encoding="utf-8") as csvfile:
try:
meta = {}
projects = {}
votes = {}
section = ""
header = []
reader = csv.reader(csvfile, delimiter=';')
for i, row in enumerate(reader):
if str(row[0]).strip().lower() in ["meta", "projects", "votes"]:
section = str(row[0]).strip().lower()
header = next(reader)
elif section == "meta":
meta[row[0]] = row[1].strip()
elif section == "projects":
projects[row[0]] = {}
for it, key in enumerate(header[1:]):
projects[row[0]][key.strip()] = row[it+1].strip()
elif section == "votes":
votes[row[0]] = {}
for it, key in enumerate(header[1:]):
votes[row[0]][key.strip()] = row[it+1].strip()
except:
print(f"CSV parsing failed for {filename}. Skipping..")
return
try:
N = list(votes.keys())
C = sorted(projects.keys())
profile = {i : votes[i]["vote"].split(",") for i in N}
cost = {c : int(projects[c]["cost"]) for c in C}
# selection = [c for c in C if projects[c]["selected"] == "1"]
# not_elected = [c for c in C if c not in selection]
except:
print(f"Python conversion during parsing failed for {filename}. Skipping..")
return
# if len(C) > 200 or len(N) > 100000:
# continue
# print(f"|N| = {len(N)}, |C| = {len(C)}")
try:
if "," in meta["budget"]:
B = int(meta["budget"].split(",")[0]) + 1
else:
B = int(meta["budget"])
except:
print(f"No budget specified during parsing for {filename}. Skipping..")
return
return {
"N": N,
"C": C,
"profile": profile,
"cost": cost,
"budget": B,
}
def exhaustive(C, cost, budget, committee):
committee_cost = sum(cost[c] for c in committee)
if committee_cost > budget:
return False
for c in C:
if c not in committee:
if cost[c] + committee_cost < budget - 0.1:
return False
return True
def cost_equal_shares(N, C, cost, approval_input, initial_budget):
budget = {i : Fraction(initial_budget,len(N)) for i in N}
committee = []
approvers = {c : list(approval_input[c]) for c in C}
remaining = {c for c in C if cost[c] > 0 and len(approvers[c]) > 0} # throw away dummy candidates
previous_rho = {c : Fraction(1, len(approvers[c])) for c in remaining}
found_tie = False
while True:
best = None
best_rho = float('inf')
tie_this_round = False
for c in sorted(remaining, key=lambda c : previous_rho[c]):
if previous_rho[c] > best_rho: # best possible rho for this round still isn't good enough
break
if sum(budget[i] for i in approvers[c]) < cost[c]: # unaffordable, can delete
remaining.remove(c)
continue
approvers[c].sort(key=lambda i : Fraction(budget[i], cost[c]))
paid_so_far = 0
for j in range(len(approvers[c])):
rho = Fraction(cost[c] - paid_so_far, cost[c] * (len(approvers[c]) - j))
if rho * cost[c] <= budget[approvers[c][j]]:
# found best rho for this candidate
previous_rho[c] = rho
if rho < best_rho or \
(rho == best_rho and cost[c] < best_cost) or \
(rho == best_rho and cost[c] == best_cost and len(approvers[c]) > best_votes):
best_rho = rho
best = c
best_cost = cost[c]
best_votes = len(approvers[c])
tie_this_round = False
else:
tie_this_round = True
break
paid_so_far += budget[approvers[c][j]]
if tie_this_round:
found_tie = True
if not best:
break
committee.append(best)
remaining.remove(best)
for i in approvers[best]:
budget[i] -= min(budget[i], best_rho * cost[best])
return tuple(sorted(committee)), found_tie
def cost_equal_shares_completed(N, C, cost, approval_input, budget):
last_outcome = None
num_outcomes = 0
for percent in range(100, 201, 1):
initial_budget = math.floor(budget * percent/100)
committee, was_tied = cost_equal_shares(N, C, cost, approval_input, initial_budget)
if was_tied:
return True
if exhaustive(C, cost, budget, committee):
return False # completed without having encountered a tie
if sum(cost[c] for c in committee) > budget:
return False # completed without having encountered a tie
folder = "20230128190215_pabulib_approval/"
tie_frequency = {}
equal_shares_tie_frequency = {}
for fileID in tqdm(os.listdir(folder)):
filename = folder + fileID
record = parse(filename)
if not record:
continue
N = record["N"]
C = record["C"]
cost = record["cost"]
profile = record["profile"]
vote_count = {c : 0 for c in C}
for i in N:
for c in profile[i]:
vote_count[c] += 1
C = sorted(C, key=lambda c: cost[c])
sampledNs = [random.sample(N, random.randint(int(0.3*len(N)), int(0.8*len(N)))) for _ in range(100)]
sampledVoteCounts = []
for sampledN in sampledNs:
sampledVoteCounts.append({c : 0 for c in C})
for i in sampledN:
for c in profile[i]:
sampledVoteCounts[-1][c] += 1
tie_frequency[fileID] = 0
equal_shares_tie_frequency[fileID] = 0
tied_samples = []
for i, c in enumerate(C):
if i == 0:
continue
if cost[c] == cost[C[i-1]]:
for num, sampledVoteCount in enumerate(sampledVoteCounts):
if sampledVoteCount[c] == sampledVoteCount[C[i-1]]:
tie_frequency[fileID] += 1
tied_samples.append(num)
break
if tie_frequency[fileID] > 0:
for num in tied_samples:
# run equal shares to see if indeed tied
N_ = sampledNs[num]
approval_input = {c : [i for i in N_ if c in profile[i]] for c in C}
encounters_tie = cost_equal_shares_completed(N_, C, cost, approval_input, record["budget"])
if encounters_tie:
equal_shares_tie_frequency[fileID] += 1
print("Overall frequency of ties:", sum(tie_frequency.values())/(len(tie_frequency)*100))
print("Overall frequency of ties in equal shares:", sum(equal_shares_tie_frequency.values())/(len(equal_shares_tie_frequency)*100))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment