Skip to content

Instantly share code, notes, and snippets.

@jimsug
Created December 16, 2021 13:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jimsug/fcfde496a5bfb5b6172149b3de8f69e7 to your computer and use it in GitHub Desktop.
Save jimsug/fcfde496a5bfb5b6172149b3de8f69e7 to your computer and use it in GitHub Desktop.
Timezone bot

Most probably horribly broken. You probably need telepot and timezonefinder

import shelve, datetime, pytz, logging, json, sys
from dateutil.parser import parse
from uuid import uuid4
# telegram bot things
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, ParseMode, InputTextMessageContent
from telegram.ext import Updater, ConversationHandler, CommandHandler, CallbackQueryHandler, InlineQueryHandler, MessageHandler, ChosenInlineResultHandler, Filters
# latlng to tz
from timezonefinder import TimezoneFinder
updater = Updater("<yourapikey>")
WAITING_FOR_MSG, WAITING_FOR_POLL = range(2)
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
# A blacklist of timezones - apparently a bunch of places are sensitive about things? idk
tz_blacklist = [
"Atlantic/Madeira"
]
country_override = {
"GB":{
"name": "United Kingdom"
}
}
std_fmt = "%b %-d %Y, %H:%M %Z"
alphakeyboard = [
[
InlineKeyboardButton("A", callback_data='country_A'),
InlineKeyboardButton("B", callback_data='country_B'),
InlineKeyboardButton("C", callback_data='country_C'),
InlineKeyboardButton("D", callback_data='country_D'),
InlineKeyboardButton("E", callback_data='country_E')
],
[
InlineKeyboardButton("F", callback_data='country_F'),
InlineKeyboardButton("G", callback_data='country_G'),
InlineKeyboardButton("H", callback_data='country_H'),
InlineKeyboardButton("I", callback_data='country_I'),
InlineKeyboardButton("J", callback_data='country_J')
],
[
InlineKeyboardButton("K", callback_data='country_K'),
InlineKeyboardButton("L", callback_data='country_L'),
InlineKeyboardButton("M", callback_data='country_M'),
InlineKeyboardButton("N", callback_data='country_N'),
InlineKeyboardButton("O", callback_data='country_O')
],
[
InlineKeyboardButton("P", callback_data='country_P'),
InlineKeyboardButton("Q", callback_data='country_Q'),
InlineKeyboardButton("R", callback_data='country_R'),
InlineKeyboardButton("S", callback_data='country_S'),
InlineKeyboardButton("T", callback_data='country_T')
],
[
InlineKeyboardButton("U", callback_data='country_U'),
InlineKeyboardButton("V", callback_data='country_V'),
InlineKeyboardButton("W", callback_data='country_W'),
InlineKeyboardButton("X", callback_data='country_X'),
InlineKeyboardButton("Y", callback_data='country_Y'),
InlineKeyboardButton("Z", callback_data='country_Z')
],
[
InlineKeyboardButton("✖️ Cancel", callback_data='cancel')
]
]
poll_keyboard = [[
InlineKeyboardButton("Add/remove me", callback_data="poll_toggle_{}")
]]
poll_markup = InlineKeyboardMarkup(poll_keyboard)
def error(bot, update, error):
logger.warning('Update "%s" caused error "%s"', update, error)
def keyboard_sort(a, b):
if a.text < b.text:
return +1
elif a.text > b.text:
return 1
else:
return 0
def inlinequery(bot, update):
query = update.inline_query.query
keyboard = [[
InlineKeyboardButton("Show in my timezone", callback_data="convert"),
InlineKeyboardButton("Set my timezone", url="https://telegram.me/restzbot")
]]
markup = InlineKeyboardMarkup(keyboard)
results = []
user_messages = get_user_message_by_id(update.inline_query.from_user.id, query) or get_user_messages(update.inline_query.from_user.id)
if not user_messages:
results = []
elif len(user_messages) > 1:
result_messages = {}
for msg_uuid, msg in user_messages.items():
if query in msg['message']:
result_messages[msg_uuid] = msg
if not result_messages:
result_message = user_messages
else:
result_messages = user_messages
if result_messages:
for msg_uuid, msg in result_messages.items():
if (msg_uuid.startswith("poll_")):
results.append(InlineQueryResultArticle(
id=msg_uuid,
title="Poll",
input_message_content=InputTextMessageContent(
message_text=msg['message']
),
reply_markup=poll_markup
))
else:
results.append(InlineQueryResultArticle(
id=msg_uuid,
title=msg['message'],
input_message_content=InputTextMessageContent(
message_text=msg['message']
),
reply_markup=markup
))
update.inline_query.answer(results, is_personal=True, cache_time=60)
def mytz(bot, update):
user_tz = timezone_get(update.message.from_user.id)
keyboard = alphakeyboard
reply_markup = InlineKeyboardMarkup(keyboard)
reply_message = ""
if user_tz:
reply_message += "Your timezone is currently set to `{}`\n".format(user_tz)
reply_message += "Please choose the first letter of your country"
return update.message.reply_text(reply_message, parse_mode="Markdown", reply_markup=reply_markup)
def button_callbacks(bot, update):
query = update.callback_query
if query.data.startswith("poll_toggle"):
return toggle_user_on_poll(bot, update)
if query.data.startswith("poll_close_"):
return close_poll(bot, update, query.data[11:])
if query.data.startswith("delete_"):
return delete_message(bot, update, query.data[7:])
if query.data == "convert":
return convert_message_to_local(bot, update)
if query.data == "get_times in local":
return send_local_tz(bot, update)
if query.data == "cancel":
return query.edit_message_text("Command cancelled")
if query.data.startswith("tz_"):
return set_timezone(bot, update, query.data[3:])
if query.data.startswith("country_code_"):
return reply_timezones(bot, update, query.data[-2:])
if query.data.startswith("country_"):
return reply_countries(bot, update, query.data[-1])
if query.data == "alphalist":
return country_list(bot, update)
def toggle_user_on_poll(bot, update):
inline_query_id = update.callback_query.inline_message_id
msg_info = get_message(inline_query_id)
user_id = update.callback_query.from_user.id
user_name = update.callback_query.from_user.username or update.callback_query.from_user.first_name
user_tz = timezone_get(user_id) or False
if not user_tz:
return update.callback_query.answer(
text='Your timezone is not set',
show_alert=True,
cache_time=20
)
if user_id in [user[0] for user in msg_info['respondents']]:
for user in msg_info:
if user[0] == user_id:
msg_info.remove(user)
response_text = 'Your response has been retracted'
else:
msg_info['respondents'].append([user_id, user_name])
response_text = 'Your response has been recorded'
update_chosen(inline_query_id, msg_info)
poll_refresh(bot, update, msg_info)
return update.callback_query.answer(
text=response_text
)
def update_chosen(inline_query_id, msg_info):
with shelve.open("data/chosen", "w") as db:
del db[inline_query_id]
db[inline_query_id] = msg_info
return
def poll_refresh(bot, update, msg_info):
original_message = msg_info['message']
timezones = {}
for user in msg_info['respondents']:
user_id = user[0]
user_name = user[1]
offset = datetime.datetime.now(pytz.timezone(timezone_get(user_id))).strftime("%z")
try:
timezones[offset].append(user_name)
except KeyError:
timezones[offset] = [user_name]
tz_list = ""
print(timezones)
for offset in [key for key in timezones.keys()].sort(key=lambda x: int(x)):
tz_list += "{}:\n{}".format(offset, "\n- ".join(timezones[offset]["users"]))
return update.callback_query.edit_message_text("{}\n{}".format(original_message, tz_list), parse_mode="Markdown", markup=poll_markup)
def poll_close(bot, update, poll_id):
return
def extract_timestamps(input_string):
results = []
input_lines = input_string.split("\n")
for line in input_lines:
finished = False
while not finished:
try:
parsed_dt = parse(line, fuzzy=True, fuzzy_with_tokens=True, dayfirst=True)
except ValueError:
parsed_dt = False
if parsed_dt:
results.append(parsed_dt[0])
line = " ".join(parsed_dt[1])
else:
finished = True
logging.info(results)
return results
def convert_message_to_local(bot, update):
inline_query_id = update.callback_query.inline_message_id
msg_info = get_message(inline_query_id)
msg_text = msg_info['message']
msg_tz = msg_info['user_tz'] or "UTC"
user_tz = timezone_get(update.callback_query.from_user.id) or "UTC"
response_text = msg_text
dates = msg_info["parsed_dates"]
converted_dates = []
target_tz = pytz.timezone(user_tz)
default_tz = pytz.timezone(msg_tz)
for dt in dates:
converted_dt = dt.astimezone(target_tz)
# converted_dates.append([str(dt), str(converted_dt)])
converted_dates.append([dt.strftime(std_fmt), converted_dt.strftime(std_fmt)])
alert_to_show = ""
for times in converted_dates:
string_to_add = "{} = {}".format(times[0], times[1])
if len(string_to_add) + len(alert_to_show) < 198:
alert_to_show += "{}\n".format(string_to_add)
response_text = alert_to_show
return update.callback_query.answer(
text=response_text,
show_alert=True,
cache_time=30
)
def create_poll(bot, update):
update.message.reply_text(
"Send me the text to be attached to your poll"
)
return WAITING_FOR_POLL
def create_message(bot, update):
update.message.reply_text(
"Send me a message or send /cancel to cancel"
)
return WAITING_FOR_MSG
def cancel_message_create(bot, update):
update.message.reply_text(
"Message creation cancelled.",
parse_mode="Markdown"
)
return ConversationHandler.END
def delete_message(bot, update, message_id):
if message_delete(update.callback_query.from_user.id, message_id):
return update.callback_query.edit_message_text("_This message was deleted._", parse_mode="Markdown")
def message_delete(user_id, message_id):
user_id = str(user_id)
with shelve.open("data/messages", "w") as db:
messages = db[user_id]
del messages[message_id]
del db[user_id]
db[user_id] = messages
return True
def store_poll(bot, update):
user_id = str(update.message.from_user.id)
message = update.message.text
with shelve.open("data/polls", "c") as db:
msg_uuid = uuid4().urn[9:]
try:
messages = db[user_id]
except KeyError:
db[user_id] = {}
messages = {}
messages[msg_uuid] = {
"message": "📊 Timezone Poll\n{}".format(message),
"respondents": []
}
del db[user_id]
db[user_id] = messages
keyboard = [[
InlineKeyboardButton(
"❌ Delete this message",
callback_data='delete_poll_{}'.format(msg_uuid)),
InlineKeyboardButton("💬 Try it!",
switch_inline_query="poll_{}".format(msg_uuid)),
InlineKeyboardButton("💬 Try it here!",
switch_inline_query_current_chat="poll_{}".format(msg_uuid))
]]
markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("Poll created!",
reply_markup=markup,
reply_to_message_id=update.message.message_id)
return ConversationHandler.END
def store_message(bot, update):
user_id = str(update.message.from_user.id)
message = update.message.text
message_lines = message.split("\n")
parsed_dates = extract_timestamps(message)
if not parsed_dates:
update.message.reply_text("I couldn't find a date/time in this message. Try again, or send /cancel to cancel.")
return WAITING_FOR_MSG
user_tz = timezone_get(user_id)
with shelve.open("data/messages", "w") as db:
msg_uuid = uuid4().urn[9:]
try:
messages = db[user_id]
except KeyError:
db[user_id] = {}
messages = {}
stored_dates = []
default_tz = pytz.timezone(user_tz)
for dt in parsed_dates:
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt = default_tz.localize(dt)
stored_dates.append(dt)
messages[msg_uuid] = {
"message": message,
"user_tz": user_tz,
"parsed_dates": stored_dates
}
del db[user_id]
db[user_id] = messages
keyboard = [[
InlineKeyboardButton("❌ Delete this message", callback_data='delete_{}'.format(msg_uuid)),
InlineKeyboardButton("💬 Try it!", switch_inline_query="{}".format(msg_uuid)),
InlineKeyboardButton("💬 Try it here!", switch_inline_query_current_chat="{}".format(msg_uuid))
]]
markup = InlineKeyboardMarkup(keyboard)
update.message.reply_text("Message created. The date/time(s) detected was: {} (approx {} chars when converted - only 200 characters will be shown)".format(
", ".join([ sdt.strftime(std_fmt) for sdt in parsed_dates])
, (len(std_fmt) + 4) * len(parsed_dates)
),
reply_markup=markup,
reply_to_message_id=update.message.message_id
)
return ConversationHandler.END
def get_message(inline_query_id):
with shelve.open("data/chosen", "r") as db:
try:
message = db[inline_query_id]
except KeyError:
message = False
return message
def get_user_message_by_id(user_id, message_uuid):
user_id = str(user_id)
message_uuid = str(message_uuid)
if (message_uuid.startswith("poll_")):
with shelve.open("data/polls", "r") as db:
try:
return {message_uuid:db[user_id][message_uuid[5:]]}
except KeyError:
return False
else:
with shelve.open("data/messages", "r") as db:
try:
return {message_uuid:db[user_id][message_uuid]}
except KeyError:
return False
return False
def get_user_messages(user_id):
user_id = str(user_id)
with shelve.open("data/messages", "r") as db:
try:
return db[user_id]
except KeyError:
return False
return False
def set_timezone(bot, update, timezone):
tz = timezone_set(update.callback_query.from_user.id, timezone)
if tz:
update.callback_query.edit_message_text("Your timezone was successfully set to {}".format(tz))
else:
update.callback_query.edit_message_text("There was an error setting your timezone. Please try again later.")
def timezone_get(user_id):
user_id = str(user_id)
with shelve.open("data/users", "w") as db:
try:
timezone = db[user_id]
except:
return False
return timezone
def timezone_set(user_id, timezone):
user_id = str(user_id)
with shelve.open("data/users", "w") as db:
db[user_id] = timezone
return timezone
def reply_timezones(bot, update, country_code):
keyboard = []
for tz in pytz.country_timezones[country_code]:
if tz not in tz_blacklist:
keyboard.append([InlineKeyboardButton(tz, callback_data="tz_{}".format(tz))])
keyboard.append(
[
InlineKeyboardButton("✖️ Cancel", callback_data='cancel'),
InlineKeyboardButton("◀️ Back", callback_data='country_{}'.format(pytz.country_names[country_code][0]))
]
)
markup = InlineKeyboardMarkup(keyboard)
update.callback_query.edit_message_text("Select your timezone", reply_markup=markup)
def reply_countries(bot, update, start_letter):
keyboard = []
for country in pytz.country_names:
if pytz.country_names[country].startswith(start_letter) or country.startswith(start_letter):
keyboard.append([InlineKeyboardButton("{} ({})".format(pytz.country_names[country], country.upper()), callback_data='country_code_{}'.format(country))])
for overridecountry, overridecountry_data in country_override.items():
if overridecountry_data["name"].startswith(start_letter):
keyboard.append([InlineKeyboardButton("{} ({})".format(
overridecountry_data["name"],
overridecountry),
callback_data='country_code_{}'.format(overridecountry)
)])
keyboard.sort(key=lambda x: x[0].text)
if keyboard:
keyboard.append([
InlineKeyboardButton("✖️ Cancel", callback_data='cancel'),
InlineKeyboardButton("◀️ Back", callback_data='alphalist'.format(start_letter))
])
markup = InlineKeyboardMarkup(keyboard)
update.callback_query.edit_message_text("Select your country", reply_markup=markup)
else:
update.callback_query.answer("No countries start with that letter, try another one")
return mytz(bot, update)
def country_list(bot, update):
keyboard = alphakeyboard
user_tz = timezone_get(update.callback_query.from_user.id)
reply_message = ""
if user_tz:
reply_message += "Your timezone is currently set to `{}`\n".format(user_tz)
reply_markup = InlineKeyboardMarkup(keyboard)
reply_message += "Please choose the first letter of your country"
return update.callback_query.edit_message_text(reply_message, parse_mode="Markdown", reply_markup=reply_markup)
def get_tz_from_loc(bot, update):
longitude = update.message.location.longitude
latitude = update.message.location.latitude
tf = TimezoneFinder()
tz = tf.timezone_at(lng=longitude, lat=latitude)
if tz:
timezone = timezone_set(update.message.from_user.id, tz)
if timezone:
update.message.reply_text("Your timezone has been set to {}".format(tz))
else:
update.message.reply_text("I couldn't set your timezone from your location. Please set it manually using /mytz")
def on_result_chosen(bot, update):
inline_query_id = update.chosen_inline_result.inline_message_id
chosen_id = update.chosen_inline_result.result_id
user_id = str(update.chosen_inline_result.from_user.id)
user_tz = timezone_get(user_id) or "UTC"
if chosen_id.startswith("poll_"):
with shelve.open("data/polls", "r") as polldb:
message = polldb[user_id][chosen_id[5:]]
else:
with shelve.open("data/messages", "r") as msgdb:
message = msgdb[user_id][chosen_id]
with shelve.open("data/chosen", "w") as db:
db[inline_query_id] = message
def reset_bot(bot, update):
with shelve.open("data/users","n") as db:
pass
with shelve.open("data/messages","n") as db:
pass
with shelve.open("data/chosen","n") as db:
pass
update.message.reply_text("Bot data reset")
def stats(bot, update):
with shelve.open("data/users", "r") as db:
users = len(db)
with shelve.open("data/messages", "r") as db:
creating_users = len(db)
messages = 0
min_msgs = False
max_msgs = 0
for user, umsgs in db.items():
user_msgs = len([msgid for msgid, msginfo in umsgs.items()])
messages += user_msgs
if user_msgs > max_msgs:
max_msgs = user_msgs
if ((user_msgs < min_msgs) or (not min_msgs)) and user_msgs != 0:
min_msgs = user_msgs
with shelve.open("data/chosen", "r") as db:
chosen = len(db)
update.message.reply_text("""📊Stats
👩‍👩‍👧‍👧 {} users have set their timezones
💬 {} users have created {} messages (min: {}, max: {})
🔊 {} messages have been sent in chat
""".format(users, creating_users, messages, min_msgs, max_msgs, chosen), parse_mode="Markdown")
def help_msg(bot, update):
help_msg_text = """You can use this bot to send dynamic date/times, or you can set your timezone with this bot to view dynamic date/times.
- Send your location to set your timezone automatically
- Use /mytz to select your timezone manually
- Use /new to compose a new message to send
- Use this bot in inline mode to send a message with dynamic date/times. Defaults to your timezone unless specified, or UTC if not set.
You can send multiple date/times, but each date/time must be on its own line. Expect problems if you have multiple date/times on the same line. There is also a limit of 200 characters for the conversion alert, so you may need to have multiple dates/times in multiple messages.
The date/time is set based on your set timezone, or UTC if explicitly set (or if you haven't set a timezone) at the time of creating the message. That means that you can change your timezone, create a message in that timezone, then change it back, and the message will preserve the timezone at the time of setting.
Caching: to help with server load, some data is cached - **converted times** are cached for a few minutes, as your timezone is not expected to change very often. **Messages** (which you compose) are also cached in inline mode for a few minutes, so if you don't see the message there immediately, please wait a few minutes before trying.
Privacy: the full text of messages are stored. If you send a message using inline mode the message content is stored even if you delete the message later using the bot. However, your user information is not stored with the message, so you can delete it.
If you are concerned about sensitive information you may wish to split date/time information into a message separate to other content.
If you send the bot your location, your coordinates are used to set your timezone and not stored anywhere."""
return update.message.reply_text(help_msg_text, parse_mode="Markdown")
def restart(bot, update):
global updater
update.message.reply_text("Restarting...")
logger.info("Restarted by {}".format(update.message.from_user.id))
updater.stop()
sys.exit(2)
def main():
global updater
new_message_handler = ConversationHandler(
entry_points=[
CommandHandler(
"new",
create_message
)
],
states={
WAITING_FOR_MSG:[
MessageHandler(
Filters.text,
store_message
),
CommandHandler(
"new",
create_message
)
]
},
fallbacks=[
CommandHandler(
"cancel",
cancel_message_create
)
]
)
new_poll_handler = ConversationHandler(
entry_points=[
CommandHandler(
"newpoll",
create_poll
)
],
states={
WAITING_FOR_POLL:[
MessageHandler(
Filters.text,
store_poll
)
]
},
fallbacks=[
CommandHandler(
"cancel",
cancel_message_create
)
]
)
dp = updater.dispatcher
dp.add_handler(CommandHandler('start', help_msg))
dp.add_handler(CommandHandler('help', help_msg))
dp.add_handler(CommandHandler('mytz', mytz))
# dp.add_handler(CommandHandler('reset', reset_bot, filters=Filters.user(username="jimsug")))
dp.add_handler(CommandHandler('stats', stats, filters=Filters.user(username="jimsug")))
dp.add_handler(CommandHandler('restart', restart, filters=Filters.user(username="jimsug")))
dp.add_handler(CallbackQueryHandler(button_callbacks))
dp.add_handler(MessageHandler(Filters.location, get_tz_from_loc))
dp.add_handler(InlineQueryHandler(inlinequery))
dp.add_handler(ChosenInlineResultHandler(on_result_chosen))
dp.add_handler(new_message_handler)
dp.add_handler(new_poll_handler)
updater.start_polling()
updater.idle()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment