Skip to content

Instantly share code, notes, and snippets.

@yelizariev
Last active November 10, 2021 12:48
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save yelizariev/304562fc9a5b1e219fc57dda3f3d8ddd to your computer and use it in GitHub Desktop.
Save yelizariev/304562fc9a5b1e219fc57dda3f3d8ddd to your computer and use it in GitHub Desktop.
TODO-list bot for Telegram. OBSOLETE: latest version is here https://itpp.dev/chat/todo-bot/index.html

Allows to create TODOs for a small group of users.

Tasks can have on of the following states:

  • TODO -- to be done
  • DONE -- done
  • CANCELED -- nothing was done and not going to be done
  • WAITING -- cannot be started and waits for something

Technical specification

  • /mytasks, /tasks_from_me -- shows tasks. By default it shows only WAITING and TODO tasks

    • Prints all tasks in a single message with button "Load more Done", "Load more WAITING", "Load more Canceled"
  • /t123 -- shows specific task.

    • Prints original forwarded messages
    • Prints replies
    • You can change status from here
  • /users -- returns list of users for current chat. It's used to specify list of available users to assign the tasks

  • /update_id -- current update_id. Can be used to set MIN_UPDATE_ID (see below)

To create new task:

  • Forward message to the bot
  • Assign to a user from the list

To discuss a task:

  • Reply to a message from the bot

Create a bot

https://telegram.me/botfather -- follow instruction to set bot name and get bot token

Prepare zip file

To make a deployment package execute following commands::

cd /path/to/new/folder

pip2 install pyTelegramBotAPI -t .
wget https://gist.githubusercontent.com/yelizariev/304562fc9a5b1e219fc57dda3f3d8ddd/raw/lambda_function.py -O lambda_function.py
zip -r /tmp/todo_bot_package.zip *

Create DynamoDB table

  • Partition key: id (number)
  • Unmark [ ] Use default settings checkbox

Add Secondary index:

  • Partition key: from_id (number)
  • Sort key: task_state (number)
  • Index name: from_id-task_state-index
  • Projected attributes: Include -- then add field description

Add another Secondary index:

  • Partition key: to_id (number)
  • Sort key: task_state (number)
  • Index name: to_id-task_state-index
  • Projected attributes: Include -- then add field description

Create Lambda function

Runtime

Use Python 2.7

Environment variables

  • BOT_TOKEN -- the one you got from BotFather
  • USERS -- skip if you don't know it. Send command to the bot /users from the a group with all users. Then set this variable
  • DYNAMODB_TABLE -- table with tasks
  • LOG_LEVEL -- DEBUG or INFO
  • MIN_UPDATE_ID -- Number to distract from update_id in task's id computation. Use /update_id to get value.

Trigger

User API Gateway. Once you configure it and save, you will see Invoke URL under Atpi Gateway details section

Role

  • The role must allow access to lambda and dynamodb services. The mimimal policies are:

for dynamodb:

{
"Version": "2012-10-17",
"Statement": [
    {
        "Sid": "VisualEditor0",
        "Effect": "Allow",
        "Action": [
            "dynamodb:DescribeReservedCapacity*",
            "dynamodb:List*",
            "dynamodb:DescribeTimeToLive",
            "dynamodb:DescribeLimits"
        ],
        "Resource": "*"
    },
    {
        "Sid": "VisualEditor1",
        "Effect": "Allow",
        "Action": [
            "dynamodb:CreateTable",
            "dynamodb:BatchGet*",
            "dynamodb:PutItem",
            "dynamodb:DescribeTable",
            "dynamodb:Delete*",
            "dynamodb:Get*",
            "dynamodb:BatchWrite*",
            "dynamodb:Scan",
            "dynamodb:Query",
            "dynamodb:DescribeStream",
            "dynamodb:Update*"
        ],
        "Resource": [
         "arn:aws:dynamodb:*:*:table/<TABLE_NAME_HERE>"
         "arn:aws:dynamodb:*:*:table/<TABLE_NAME_HERE>/index/*"
         ]
    }
]
}

for lambda (created automatically somehow)

{
"Version": "2012-10-17",
"Statement": [
    {
        "Effect": "Allow",
        "Action": [
            "logs:CreateLogGroup",
            "logs:CreateLogStream",
            "logs:PutLogEvents"
        ],
        "Resource": [
            "arn:aws:logs:*:*:*"
        ]
    }
]
}

Register webhook at telegram

via python lib

Execute once in python console::

BOT_TOKEN = "PASTETHETOKEN"
WEB_HOOK = "PASTEAWSWEBHOOK"

import telebot  # https://github.com/eternnoir/pyTelegramBotAPI
bot = telebot.TeleBot(BOT_TOKEN, threaded=False)
bot.set_webhook(WEB_HOOK)

via curl

# TODO pass allowed_updates arg
curl -XPOST https://api.telegram.org/bot<YOURTOKEN>/setWebhook\?url\=YOURAPIGATEWAYURL
// base
// TODO: Use second index for State, remove State from sort
{
// PRIMARY KEY
"id": ID, //= update_id - MIN_UPDATE_ID
// SECONDARY KEY (partition)
// index1
"from_id": USER_ID,
// index2
"to_id": USER_ID,
// SECONDARY KEY (sort)
"task_state": STATE, // STATE: O=TODO, 1=WAITING, 2=DONE, 3=CANCELED
// Projected keys
"description": "Short representation of the TODO"
// Normal keys
"messages": [CHAT_ID + '_' + MESSAGE_ID], // original messages (forwarded or sent to bot)
"replies": [CHAT_ID + '_' + MESSAGE_ID], // discussions
}
import telebot # https://github.com/eternnoir/pyTelegramBotAPI
import os
import logging
import re
import boto3
import json
# Global variabls
FROM_INDEX = 'from_id-task_state-index'
TO_INDEX = 'to_id-task_state-index'
# READ environment variables
BOT_TOKEN = os.environ['BOT_TOKEN']
USERS = os.environ.get('USERS')
if USERS:
USERS = dict(json.loads(USERS))
DYNAMODB_TABLE = os.environ.get('DYNAMODB_TABLE')
LOG_LEVEL = os.environ.get('LOG_LEVEL')
MIN_UPDATE_ID = int(os.environ.get('MIN_UPDATE_ID', 0))
logger = logging.getLogger()
if LOG_LEVEL:
logger.setLevel(getattr(logging, LOG_LEVEL))
RESPONSE_200 = {
"statusCode": 200,
"headers": {},
"body": ""
}
MEDIA = {'sticker': 'send_sticker', 'voice': 'send_voice', 'video': 'send_video', 'document': 'send_document', 'video_note': 'send_video_note'}
TASK_STATE_TODO = 0
TASK_STATE_WAITING = 1
TASK_STATE_DONE = 2
TASK_STATE_CANCELED = 3
def lambda_handler(event, context):
global USERS
logger.debug("Event: \n%s", json.dumps(event))
logger.debug("Context: \n%s", context)
# READ webhook data
# Object Update in json format.
# See https://core.telegram.org/bots/api#update
update = telebot.types.JsonDeserializable.check_json(event["body"])
client = boto3.client('dynamodb')
# PARSE
message = update.get('message')
if not message:
return RESPONSE_200
chat = message.get('chat')
user = message.get('from')
if not USERS:
USERS = {user['id']: user2name(user)}
# Only work with disabled threaded mode. See https://github.com/eternnoir/pyTelegramBotAPI/issues/161#issuecomment-343873014
bot = telebot.TeleBot(BOT_TOKEN, threaded=False)
command, main_text = get_command_and_text(message.get('text', ''))
if not command:
pass
elif command == '/users':
# TODO
bot.send_message(chat['id'], chat['id'], reply_to_message_id=message['message_id'])
return RESPONSE_200
elif command == '/update_id':
bot.send_message(chat['id'], update['update_id'], reply_to_message_id=message['message_id'])
return RESPONSE_200
elif command in ['/mytasks', '/tasks_from_me']:
to_me = command == '/mytasks'
result = get_tasks(
client,
to_me=to_me,
user_id=user['id'],
task_state=TASK_STATE_TODO
)
response = ""
# if to_me:
# response = "My Tasks:\n\n"
# else:
# response = "Tasks from Me:\n\n"
for item_dict in result['Items']:
item = Item().load_from_dict(item_dict)
response += "/t%s:\n%s\n\n" % (item.id, item.description)
bot.send_message(chat['id'], response, reply_to_message_id=message['message_id'])
return RESPONSE_200
elif command.startswith('/t'):
task_id = int(command[2:])
result = get_task(client, task_id)
item = Item().load_from_dict(result['Item'])
for label, array in [("Messages:", item.messages), ("Discussion:", item.replies)]:
if not array:
continue
bot.send_message(chat['id'], label, reply_to_message_id=message['message_id'])
for from_chat_id, msg_id in item.messages:
bot.forward_message(
chat['id'],
from_chat_id=from_chat_id,
message_id=msg_id,
)
bot.send_message(chat['id'], "TODO: add control buttons here")
return RESPONSE_200
# REPLY
if not main_text and not any(message.get(key) for key in MEDIA) and not message.get('photo'):
bot.send_message(chat['id'], "<i>Empty message is ignored</i>", reply_to_message_id=message['message_id'], parse_mode='HTML')
return RESPONSE_200
# Add new task
# TODO: support messages without text
text = message.get('text')
item = Item(
from_user=user,
messages=[(chat['id'], message['message_id'])],
date=message['date'],
update_id=update['update_id'],
description=text,
)
add_task(client, item.to_dict())
bot.send_message(chat['id'], "<i>Task created:</i> /t%s" % item.id, reply_to_message_id=message['message_id'], parse_mode='HTML')
return RESPONSE_200
def get_command_and_text(text):
"""split message into command and main text"""
m = re.match('(/[^ @]*)([^ ]*)(.*)', text, re.DOTALL)
if m:
# group(3) is a bot name
return m.group(1), m.group(3)
else:
return None, text
def user2name(user):
name = user.get('first_name')
if user.get('last_name'):
name += ' ' + user.get('last_name')
return name
# DynamoDB wrappers
def add_task(client, item):
return client.put_item(
TableName=DYNAMODB_TABLE,
Item=item,
)
def get_task(client, id):
return client.get_item(
TableName=DYNAMODB_TABLE,
Key={
'id': Item.elem_to_num(id),
}
)
def get_tasks(client, to_me=True, user_id=None, task_state=None):
# Doc: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.query
args = {
':user_id': Item.elem_to_num(user_id)
}
if to_me:
condition = "to_id = :user_id"
index = TO_INDEX
else:
condition = "from_id = :task_state"
index = FROM_INDEX
if task_state is None:
pass
else:
condition += " and task_state = :task_state"
args[':task_state'] = Item.elem_to_num(task_state)
result = client.query(
TableName=DYNAMODB_TABLE,
IndexName=index,
Select='ALL_PROJECTED_ATTRIBUTES',
KeyConditionExpression=condition,
ExpressionAttributeValues=args,
)
return result
class Item(object):
STR_PARAMS = ['description']
INT_PARAMS = ['id', 'from_id', 'to_id', 'task_state']
def __init__(self, from_user=None, to_user=None, messages=None, task_state=TASK_STATE_TODO, date=None, update_id=None, description=''):
self.from_id = from_user and from_user['id']
self.to_id = to_user and to_user['id']
if not self.to_id:
self.to_id = self.from_id
self.messages = messages
self.task_state = task_state
self.date = date
self.update_id = update_id
if update_id:
self.id = update_id - MIN_UPDATE_ID
self.replies = []
self.description = description
def load_from_dict(self, d):
logger.debug('load_from_dict: %s', d)
for ss_param in ['messages', 'replies']:
if d.get(ss_param):
setattr(self, ss_param, d[ss_param]['SS'].split('_'))
for params, key in [(self.STR_PARAMS, 'S'), (self.INT_PARAMS, 'N')]:
for p in params:
if d.get(p):
setattr(self, p, d[p][key])
return self
@staticmethod
def elem_to_str(value):
return {"S": str(value)}
@staticmethod
def elem_to_num(value):
return {"N": str(value)}
@staticmethod
def elem_to_array_of_str(value):
return {"SS": [str(v) for v in value]}
def to_dict(self):
res = {
"messages": self.elem_to_array_of_str(
['%s_%s' % (m[0], m[1]) for m in self.messages]
),
}
for p in self.STR_PARAMS:
res[p] = self.elem_to_str(getattr(self, p))
for p in self.INT_PARAMS:
res[p] = self.elem_to_num(getattr(self, p))
return res
# EOF
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment