Skip to content

Instantly share code, notes, and snippets.

@mie00
Created October 25, 2016 16:39
Show Gist options
  • Save mie00/6f3dc731e9a9d08a2d53bcdbd2d4f1b4 to your computer and use it in GitHub Desktop.
Save mie00/6f3dc731e9a9d08a2d53bcdbd2d4f1b4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from twx.botapi import TelegramBot, send_message
import time
from collections import Counter
polls = {}
def aggregate_votes(votes):
return '\n'.join('%s: %s' % (k,v) for k,v in sorted(Counter(votes.values()).items(), key=lambda x:x[1]))
def create_poll(name, owner, vals=None):
if name in polls:
raise ValueError()
polls[name] = {'owner': owner, 'votes': {}, 'vals': vals}
def vote_poll(name, user, val):
vals = polls[name]['vals']
if vals is None or val in vals:
polls[name]['votes'][user] = val
else:
raise ValueError()
def status_poll(name):
return name, aggregate_votes(polls[name]['votes'])
def close_poll(name, user):
assert polls[name]['owner'] == user
poll = polls[name]
del polls[name]
return name, aggregate_votes(poll['votes'])
def get_owner(name):
return polls[name]['owner']
def get_values(name):
return polls[name]['vals']
def list_polls():
return polls.keys()
bot = TelegramBot('bottoken')
bot.update_bot_info().wait()
updates = bot.get_updates().wait()
user_id = None
lastid = updates[-1].update_id + 1 if updates else None
while True:
for update in updates:
try:
text = update.message.text.strip()
args = text.split()
user = update.message.sender.username
if args[0] == '/create':
try:
create_poll(args[1], user, None if len(args) == 1 else args[2].split(','))
bot.send_message(update.message.chat.id, 'poll with name %s created successfully' % args[1])
except ValueError:
bot.send_message(update.message.chat.id, 'poll with name %s already exists' % args[1])
elif args[0] == '/vote':
try:
vote_poll(args[1], user, args[2])
except ValueError:
bot.send_message(update.message.chat.id, '@%s: value %s is invalid available values are %s' % (user, args[2], ' '.join(get_values(name))))
elif args[0] == '/status':
name, votes = status_poll(args[1])
bot.send_message(update.message.chat.id, 'name: %s\n%s' % (name, votes))
elif args[0] == '/close':
try:
name, votes = close_poll(args[1], user)
bot.send_message(update.message.chat.id, 'name: %s\n%s' % (name, votes))
except AssertionError:
bot.send_message(update.message.chat.id, '%s is the only one who can close the poll' % (get_owner(args[1])))
elif args[0] == '/list':
bot.send_message(update.message.chat.id, '\n'.join(list_polls()))
elif args[0] == '/help':
bot.send_message(update.message.chat.id, """/create <poll name> [comma seperated values for available values]
/vote <poll name> <vote>
/close <poll name>
/list
/help""")
except KeyError:
bot.send_message(update.message.chat.id, 'there is no poll with the name %s' % (args[1]))
except:
pass
time.sleep(1)
lastid = updates[-1].update_id + 1 if updates else lastid
if lastid:
updates = bot.get_updates(offset=lastid).wait()
else:
updates = bot.get_updates().wait()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment