Skip to content

Instantly share code, notes, and snippets.

@suminb
Last active February 25, 2017 07:15
Show Gist options
  • Save suminb/b787df14281b03871eda44d515666d7a to your computer and use it in GitHub Desktop.
Save suminb/b787df14281b03871eda44d515666d7a to your computer and use it in GitHub Desktop.
from operator import attrgetter
import random
import sys
import click
from logbook import Logger, StreamHandler
StreamHandler(sys.stderr).push_application()
log = Logger(__name__)
@click.group()
def cli():
pass
class Candidacy(object):
def __init__(self, leader, candidate, preference):
self.leader = leader
self.candidate = candidate
self.preference = preference
def __repr__(self):
return '<{} -> {} ({})>'.format(
self.leader, self.candidate, self.preference)
def is_conflict(self, other):
return self.candidate == other.candidate and \
self.preference == other.preference
def find_conflicts(candidate_list):
"""Given a sorted records, find the first conflict (with the first element
in `candiate_list`) encountered.
"""
first = candidate_list.pop()
conflicts = [first]
while len(candidate_list) > 0:
other = candidate_list.pop()
if first.is_conflict(other):
conflicts.append(other)
else:
candidate_list.append(other)
break
if len(conflicts) == 1:
candidate_list.append(conflicts.pop())
return conflicts, candidate_list
def resolve_conflict_random(r1, r2):
"""Randomly resolves a conflict."""
pool = [r1, r2]
random.shuffle(pool)
return pool
def resolve_conflict_manual(conflicts):
"""Prompts a user input in case of conflict."""
print('Conflict found between:')
for i, c in enumerate(conflicts):
print(' {}: {}'.format(i, c))
choice = None
while choice not in range(len(conflicts)):
try:
choice = int(input('Resolve conflict [0-n]: '))
except ValueError:
pass
winner = conflicts[choice]
return winner, [c for c in conflicts if c != winner]
def shift_preferences(candidate_list, leader):
selected = [c for c in candidate_list if c.leader == leader]
if not selected:
return candidate_list
top_priority = min([c.preference for c in selected])
if top_priority > 1:
return [
Candidacy(c.leader, c.candidate, c.preference - 1)
if c.leader == leader else c
for c in candidate_list]
else:
return candidate_list
def parse_input_file(input_file):
headers = input_file.readline()
for line in input_file.readlines():
line = line.strip()
cols = line.split('\t')
if len(cols) == 3:
yield cols[0], cols[1], int(cols[2])
else:
log.warn('Invalid input: {}', line)
def print_results(records, format='console'):
records = sorted(records, key=attrgetter('leader'))
if format == 'csv':
print_results_csv(records)
else:
print_results_console(records)
def print_results_csv(records):
print('Leader,Member,Preference')
for r in records:
print(','.join([r.leader, r.candidate, str(r.preference)]))
def print_results_console(records):
print('Confirmed List:')
for r in records:
print(r)
def confirm_candidacy(confirmed_list, candidacy, max_group_members):
member_count = \
len([c for c in confirmed_list if c.leader == candidacy.leader])
if member_count >= max_group_members:
log.debug('{} group has been fulfilled'.format(candidacy.leader))
else:
confirmed_list.append(candidacy)
@cli.command()
@click.argument('input_file', type=click.File('r'))
@click.option('-n', '--max-group-members', type=int, default=4)
@click.option('-f', '--format', default='console', help='Output format.')
@click.option('--shift', is_flag=True, type=bool, default=False,
help='Upon loosing competition, automatically shifts up all ' \
'pending preferences.')
def run(input_file, max_group_members, format, shift):
candidate_list = [Candidacy(*x) for x in parse_input_file(input_file)]
confirmed_list = []
sorted_list = sorted(candidate_list,
key=attrgetter('preference', 'candidate'),
reverse=True)
while len(sorted_list) > 0:
conflicts, sorted_list = find_conflicts(sorted_list)
if conflicts:
winner, losers = resolve_conflict_manual(conflicts)
confirm_candidacy(confirmed_list, winner, max_group_members)
if shift:
for loser in losers:
sorted_list = shift_preferences(sorted_list, loser.leader)
else:
c1 = sorted_list.pop()
taken_list = [r.candidate for r in confirmed_list]
if c1.candidate in taken_list:
log.debug('{} is already taken'.format(c1))
else:
confirm_candidacy(confirmed_list, c1, max_group_members)
print_results(confirmed_list, format)
if __name__ == '__main__':
cli()
click>=6.7
logbook>=1.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment