Skip to content

Instantly share code, notes, and snippets.

@baileyparker
Last active January 25, 2018 18:47
Embed
What would you like to do?
MetaBrainz chatbox voting plugin cleanup for codereview.stackexchange.com
from functools import wraps
from inspect import ismethod
class PersistentDecorator(object):
def __init__(self, decorated_object, save):
self._decorated_object = decorated_object
self._save = save
def __getattribute__(self, attr):
if attr.startswith('_'):
return super(PersistentDecorator, self).__getattribute__(attr)
value = getattr(self._decorated_object, attr)
if not ismethod(value):
return value
@wraps(value)
def wrap(*args, **kwargs):
return_value = value(*args, **kwargs)
self._save(self._decorated_object.to_json())
return return_value
return wrap
# -*- coding: utf-8 -*-
from collections import namedtuple
from functools import wraps
import re
from poll import Poll, OptionError
from persistent_decorator import PersistentDecorator
from ..base import BasePlugin
from ..decorators import (listens_to_all, listens_to_command,
listens_to_regex_command)
STORAGE_KEY = 'data'
SUPPORT_INDICATORS = ('+', 'πŸ‘', 'πŸ‘πŸ»', 'πŸ‘πŸΌ', 'πŸ‘πŸ½', 'πŸ‘πŸΎ', 'πŸ‘πŸΏ')
OPPOSITION_INDICATORS = ('-', 'πŸ‘Ž', 'πŸ‘ŽπŸ»', 'πŸ‘ŽπŸΌ', 'πŸ‘ŽπŸ½', 'πŸ‘ŽπŸΎ', 'πŸ‘ŽπŸΏ')
class Plugin(BasePlugin):
@listens_to_command('startpoll') # (1)
def handle_start_command(self, line, option_names): # (2)
"""Handles the !startpoll command.
When issued with no arguments, starts a binary poll. Optional
arguments are the option names in the poll. For example:
<user1> !startpoll
<BrainzBot> A new poll has been started. You can vote with !support,
!oppose, !abstain, or by starting your message with a +1
or -1. See a tally of votes with !tallypoll and end the
poll with !endpoll.
<user1> !startpoll delicious_fruits vegetables
<BrainzBot> A new poll has been started between delicious_fruits and
vegetables. You can vote with !support [option],
!oppose [option], !abstain, or by starting your message
with a +[option] or -[option]. See a tally of votes with
!tallypoll and end the poll with !endpoll.
The command will report an error if a vote is ongoing.
"""
if self.current_poll:
return u'A poll is already ongoing. Issue !endpoll to end it.'
if len(option_names) == 1:
return u'Add 2 or more options to a poll or issue !startpoll ' \
'for a binary poll'
self.current_poll = Poll.create(option_names)
if len(option_names) >= 2:
return u'A new poll has started. You can vote with !support, ' \
u'!oppose, !abstain, or by starting your message with a ' \
u'+1 or -1. See a tally of votes with !tallypoll and ' \
u'end the poll with !endpoll'
else:
response = u"A new poll has started between {}. You can vote " \
u"with !support [option], !oppose [option], " \
u"!abstain, or by starting your message with a " \
u"+[option] or -[option]. See a tally of votes " \
u"with !tallypoll and end the poll with !endpoll"
return response.format(english_list(option_names)) # (3)
def requires_ongoing_poll(func): # (4)
@wraps(func)
def wrapped(self, *args, **kwargs):
if self.current_poll is None:
return u'No active poll. Start one with !startpoll'
return func(self, *args, **kwargs)
return wrapped
@listens_to_command('support')
@requires_ongoing_poll
def handle_support_command(self, line, args):
if len(args) > 1:
return u'Support expects either no arguments or one argument.'
try:
partial_option_name = next(args, None)
self.current_poll.support(line.user, partial_option_name)
except OptionError as e:
return e.message
@listens_to_command('oppose')
@requires_ongoing_poll
def handle_oppose_command(self, line, args):
if len(args) > 1:
return u'Oppose expects either no arguments or one argument.'
try:
partial_option_name = next(args, None)
self.current_poll.oppose(line.user, partial_option_name)
except OptionError as e:
return e.message
INDICATORS = r'|'.join(map(re.escape,
SUPPORT_INDICATORS + OPPOSITION_INDICATORS))
@listens_to_all(r'^(?<indicator>(' + INDICATORS + r'))(?<option>\W+)')
def handle_message_with_possible_vote(self, line, indicator, option):
if self.current_poll is None:
# Must not have been a vote
return
try:
if indicator in SUPPORT_INDICATORS:
self.current_poll.support(line.user, option)
else:
self.current_poll.oppose(line.user, option)
except OptionError as e:
return e.message
@listens_to_command('abstain')
@requires_ongoing_poll
def handle_abstain_command(self, line, args):
if len(args) > 0:
return u'Abstain expects no arguments. Issue just !abstain'
self.current_poll.oppose(line.user)
@listens_to_command('tallypoll')
@requires_ongoing_poll
def handle_tally_command(self, line, args):
if len(args) > 0:
return u'Tally expects no arguments. Issue just !tallypoll'
# NOTE: tally() returns a Tally, see below
return str(self.current_poll.tally())
@listens_to_command('endpoll')
@requires_ongoing_poll
def handle_endvote(self, line, args):
if len(args) > 0:
return u'Tally expects no arguments. Issue just !endpoll'
tally = ''
if self.current_poll.changed_since_last_tally:
# NOTE: tally() returns a Tally, see below
tally = u"{} ".format(self.current_poll.tally())
self.current_poll = None
return u"{}Poll ended.".format(tally)
@property
def current_poll(self):
# If _current_poll doesn't exist, a poll may be in storage
if not hasattr(self, '_current_poll'):
self._current_poll = self.retrieve(STORAGE_KEY)
# If there was a poll in storage, deserialize it
if self._current_poll is not None:
self._current_poll = \
self._wrap_poll(Poll.from_json(self._current_poll))
return self._current_poll
@current_poll.setter
def current_poll(self, poll):
self._current_poll = self._wrap_poll(poll)
self.store(STORAGE_KEY, poll.to_json())
def _wrap_poll(self, poll):
return PersistentDecorator(poll, partial(self.store, STORAGE_KEY))
from collections import namedtuple
import json
import operator
BINARY_OPTION_NAME = '1'
class Poll(object):
@classmethod
def create(cls, option_names):
# Support a binary poll with no named options
if len(option_names) == 0:
option_names = [BINARY_OPTION_NAME]
options = {option: (set(), set()) for option in option_names}
return Poll(options, set(), changed_since_last_tally=True)
@classmethod
def from_json(cls, serialized):
data = json.loads(serialized)
options = \
{name: (set(supporters), set(opposers))
for name, (supporters, opposers) in data['options'].iteritems()}
abstains = set(data['abstains'])
changed_since_last_tally = data['changed_since_last_tally']
return Poll(options, abstains, changed_since_last_tally)
def __init__(self, options, abstains, changed_since_last_tally):
self._options = options
self._abstains = abstains
self._changed_since_last_tally = changed_since_last_tally
def support(self, user, partial_option_name=None):
supporters, opposers = self._get_option(partial_option_name)
if user in supporters:
# No changes necessary
return
self._clear_vote(user)
supporters.add(user)
opposers.remove(user)
self._changed_since_last_tally = True
def oppose(self, user, partial_option_name=None):
supporters, opposers = self._get_option(partial_option_name)
if user in opposers:
# No changes necessary
return
self._clear_vote(user)
supporters.remove(user)
opposers.add(user)
self._changed_since_last_tally = True
def abstain(self, user):
if user in self._abstains:
# No changes necessary
return
# Clear any other votes the user has cast
for supporters, opposers in self._options.itervalues():
supporters.remove(user)
opposers.remove(user)
self._abstains.add(user)
self._changed_since_last_tally = True
def _get_option(self, partial_option_name):
if partial_option_name is None:
if len(self._options) != 1:
raise OptionError(partial_option_name, self._options.keys())
return self._options[BINARY_OPTION_NAME]
# Consider options in sorted order, finding the first that has
# partial_option_name as a prefix to support partially typed option
# names
options = sorted(self._options.iteritems(),
key=operator.itemgetter(0))
for option_name, (supporters, opposers) in options:
if option_name.startswith(partial_option_name):
return supporters, opposers
raise OptionError(partial_option_name, self._options.keys())
def tally(self):
self._changed_since_last_tally = False
options = \
[Option(name, list(supporters), list(opposers))
for name, (supporters, opposers) in self._options.iteritems()]
return Tally(options, list(self.abstains))
@property
def changed_since_last_tally(self):
return self._changed_since_last_tally
def to_json(self):
data = {'options': self._options,
'abstains': self._abstains,
'changed_since_last_tally': self._changed_since_last_tally}
return json.dumps(data)
class Tally(namedtuple('Tally', ('options', 'abstains'))):
__slots__ = ()
def __str__(self):
# NOTE: Python 3.6 f-strings would make this much cleaner
options = u' '.join(map(str, self.options))
abstaining = u"[abstain ({num}): {names}]" \
.format(num=len(self.abstains), names=', '.join(self.abstains))
return u'{} {}'.format(options, abstaining)
class Option(namedtuple('Option', ('name', 'supporters', 'opposers'))):
__slots__ = ()
def __str__(self):
# NOTE: Python 3.6 f-strings would make this much cleaner
names = u'; '.join(u', '.join(name_list)
for name_list in (self.supporters, self.opposers))
return u"[{name}(+{num_supporters}, -{num_opposers}): {names}]" \
.format(self.name, num_supporters=len(self.supporters),
num_opposers=len(self.opposers), names=names)
class OptionError(Exception):
def __init__(self, bad_name, options):
if len(options) == 1:
msg = u'poll is binary (try +1 or -1, instead of a named option)'
else:
msg = u'expected one of the following options: {}' \
.format(u', '.join(options))
super(OptionError, self).__init__(msg)
self.bad_name = bad_name
self.options = options
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment