Last active
February 25, 2017 07:15
-
-
Save suminb/b787df14281b03871eda44d515666d7a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from operator import attrgetter | |
import random | |
import sys | |
import click | |
from logbook import Logger, StreamHandler | |
StreamHandler(sys.stderr).push_application() | |
log = Logger(__name__) | |
@click.group() | |
def cli(): | |
pass | |
class Candidacy(object): | |
def __init__(self, leader, candidate, preference): | |
self.leader = leader | |
self.candidate = candidate | |
self.preference = preference | |
def __repr__(self): | |
return '<{} -> {} ({})>'.format( | |
self.leader, self.candidate, self.preference) | |
def is_conflict(self, other): | |
return self.candidate == other.candidate and \ | |
self.preference == other.preference | |
def find_conflicts(candidate_list): | |
"""Given a sorted records, find the first conflict (with the first element | |
in `candiate_list`) encountered. | |
""" | |
first = candidate_list.pop() | |
conflicts = [first] | |
while len(candidate_list) > 0: | |
other = candidate_list.pop() | |
if first.is_conflict(other): | |
conflicts.append(other) | |
else: | |
candidate_list.append(other) | |
break | |
if len(conflicts) == 1: | |
candidate_list.append(conflicts.pop()) | |
return conflicts, candidate_list | |
def resolve_conflict_random(r1, r2): | |
"""Randomly resolves a conflict.""" | |
pool = [r1, r2] | |
random.shuffle(pool) | |
return pool | |
def resolve_conflict_manual(conflicts): | |
"""Prompts a user input in case of conflict.""" | |
print('Conflict found between:') | |
for i, c in enumerate(conflicts): | |
print(' {}: {}'.format(i, c)) | |
choice = None | |
while choice not in range(len(conflicts)): | |
try: | |
choice = int(input('Resolve conflict [0-n]: ')) | |
except ValueError: | |
pass | |
winner = conflicts[choice] | |
return winner, [c for c in conflicts if c != winner] | |
def shift_preferences(candidate_list, leader): | |
selected = [c for c in candidate_list if c.leader == leader] | |
if not selected: | |
return candidate_list | |
top_priority = min([c.preference for c in selected]) | |
if top_priority > 1: | |
return [ | |
Candidacy(c.leader, c.candidate, c.preference - 1) | |
if c.leader == leader else c | |
for c in candidate_list] | |
else: | |
return candidate_list | |
def parse_input_file(input_file): | |
headers = input_file.readline() | |
for line in input_file.readlines(): | |
line = line.strip() | |
cols = line.split('\t') | |
if len(cols) == 3: | |
yield cols[0], cols[1], int(cols[2]) | |
else: | |
log.warn('Invalid input: {}', line) | |
def print_results(records, format='console'): | |
records = sorted(records, key=attrgetter('leader')) | |
if format == 'csv': | |
print_results_csv(records) | |
else: | |
print_results_console(records) | |
def print_results_csv(records): | |
print('Leader,Member,Preference') | |
for r in records: | |
print(','.join([r.leader, r.candidate, str(r.preference)])) | |
def print_results_console(records): | |
print('Confirmed List:') | |
for r in records: | |
print(r) | |
def confirm_candidacy(confirmed_list, candidacy, max_group_members): | |
member_count = \ | |
len([c for c in confirmed_list if c.leader == candidacy.leader]) | |
if member_count >= max_group_members: | |
log.debug('{} group has been fulfilled'.format(candidacy.leader)) | |
else: | |
confirmed_list.append(candidacy) | |
@cli.command() | |
@click.argument('input_file', type=click.File('r')) | |
@click.option('-n', '--max-group-members', type=int, default=4) | |
@click.option('-f', '--format', default='console', help='Output format.') | |
@click.option('--shift', is_flag=True, type=bool, default=False, | |
help='Upon loosing competition, automatically shifts up all ' \ | |
'pending preferences.') | |
def run(input_file, max_group_members, format, shift): | |
candidate_list = [Candidacy(*x) for x in parse_input_file(input_file)] | |
confirmed_list = [] | |
sorted_list = sorted(candidate_list, | |
key=attrgetter('preference', 'candidate'), | |
reverse=True) | |
while len(sorted_list) > 0: | |
conflicts, sorted_list = find_conflicts(sorted_list) | |
if conflicts: | |
winner, losers = resolve_conflict_manual(conflicts) | |
confirm_candidacy(confirmed_list, winner, max_group_members) | |
if shift: | |
for loser in losers: | |
sorted_list = shift_preferences(sorted_list, loser.leader) | |
else: | |
c1 = sorted_list.pop() | |
taken_list = [r.candidate for r in confirmed_list] | |
if c1.candidate in taken_list: | |
log.debug('{} is already taken'.format(c1)) | |
else: | |
confirm_candidacy(confirmed_list, c1, max_group_members) | |
print_results(confirmed_list, format) | |
if __name__ == '__main__': | |
cli() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
click>=6.7 | |
logbook>=1.0.0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment