Created
October 25, 2016 16:39
-
-
Save mie00/6f3dc731e9a9d08a2d53bcdbd2d4f1b4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from twx.botapi import TelegramBot, send_message | |
import time | |
from collections import Counter | |
polls = {} | |
def aggregate_votes(votes): | |
return '\n'.join('%s: %s' % (k,v) for k,v in sorted(Counter(votes.values()).items(), key=lambda x:x[1])) | |
def create_poll(name, owner, vals=None): | |
if name in polls: | |
raise ValueError() | |
polls[name] = {'owner': owner, 'votes': {}, 'vals': vals} | |
def vote_poll(name, user, val): | |
vals = polls[name]['vals'] | |
if vals is None or val in vals: | |
polls[name]['votes'][user] = val | |
else: | |
raise ValueError() | |
def status_poll(name): | |
return name, aggregate_votes(polls[name]['votes']) | |
def close_poll(name, user): | |
assert polls[name]['owner'] == user | |
poll = polls[name] | |
del polls[name] | |
return name, aggregate_votes(poll['votes']) | |
def get_owner(name): | |
return polls[name]['owner'] | |
def get_values(name): | |
return polls[name]['vals'] | |
def list_polls(): | |
return polls.keys() | |
bot = TelegramBot('bottoken') | |
bot.update_bot_info().wait() | |
updates = bot.get_updates().wait() | |
user_id = None | |
lastid = updates[-1].update_id + 1 if updates else None | |
while True: | |
for update in updates: | |
try: | |
text = update.message.text.strip() | |
args = text.split() | |
user = update.message.sender.username | |
if args[0] == '/create': | |
try: | |
create_poll(args[1], user, None if len(args) == 1 else args[2].split(',')) | |
bot.send_message(update.message.chat.id, 'poll with name %s created successfully' % args[1]) | |
except ValueError: | |
bot.send_message(update.message.chat.id, 'poll with name %s already exists' % args[1]) | |
elif args[0] == '/vote': | |
try: | |
vote_poll(args[1], user, args[2]) | |
except ValueError: | |
bot.send_message(update.message.chat.id, '@%s: value %s is invalid available values are %s' % (user, args[2], ' '.join(get_values(name)))) | |
elif args[0] == '/status': | |
name, votes = status_poll(args[1]) | |
bot.send_message(update.message.chat.id, 'name: %s\n%s' % (name, votes)) | |
elif args[0] == '/close': | |
try: | |
name, votes = close_poll(args[1], user) | |
bot.send_message(update.message.chat.id, 'name: %s\n%s' % (name, votes)) | |
except AssertionError: | |
bot.send_message(update.message.chat.id, '%s is the only one who can close the poll' % (get_owner(args[1]))) | |
elif args[0] == '/list': | |
bot.send_message(update.message.chat.id, '\n'.join(list_polls())) | |
elif args[0] == '/help': | |
bot.send_message(update.message.chat.id, """/create <poll name> [comma seperated values for available values] | |
/vote <poll name> <vote> | |
/close <poll name> | |
/list | |
/help""") | |
except KeyError: | |
bot.send_message(update.message.chat.id, 'there is no poll with the name %s' % (args[1])) | |
except: | |
pass | |
time.sleep(1) | |
lastid = updates[-1].update_id + 1 if updates else lastid | |
if lastid: | |
updates = bot.get_updates(offset=lastid).wait() | |
else: | |
updates = bot.get_updates().wait() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment