Most probably horribly broken. You probably need telepot and timezonefinder
Created
December 16, 2021 13:17
-
-
Save jimsug/fcfde496a5bfb5b6172149b3de8f69e7 to your computer and use it in GitHub Desktop.
Timezone bot
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shelve, datetime, pytz, logging, json, sys | |
from dateutil.parser import parse | |
from uuid import uuid4 | |
# telegram bot things | |
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, ParseMode, InputTextMessageContent | |
from telegram.ext import Updater, ConversationHandler, CommandHandler, CallbackQueryHandler, InlineQueryHandler, MessageHandler, ChosenInlineResultHandler, Filters | |
# latlng to tz | |
from timezonefinder import TimezoneFinder | |
updater = Updater("<yourapikey>") | |
WAITING_FOR_MSG, WAITING_FOR_POLL = range(2) | |
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# A blacklist of timezones - apparently a bunch of places are sensitive about things? idk | |
tz_blacklist = [ | |
"Atlantic/Madeira" | |
] | |
country_override = { | |
"GB":{ | |
"name": "United Kingdom" | |
} | |
} | |
std_fmt = "%b %-d %Y, %H:%M %Z" | |
alphakeyboard = [ | |
[ | |
InlineKeyboardButton("A", callback_data='country_A'), | |
InlineKeyboardButton("B", callback_data='country_B'), | |
InlineKeyboardButton("C", callback_data='country_C'), | |
InlineKeyboardButton("D", callback_data='country_D'), | |
InlineKeyboardButton("E", callback_data='country_E') | |
], | |
[ | |
InlineKeyboardButton("F", callback_data='country_F'), | |
InlineKeyboardButton("G", callback_data='country_G'), | |
InlineKeyboardButton("H", callback_data='country_H'), | |
InlineKeyboardButton("I", callback_data='country_I'), | |
InlineKeyboardButton("J", callback_data='country_J') | |
], | |
[ | |
InlineKeyboardButton("K", callback_data='country_K'), | |
InlineKeyboardButton("L", callback_data='country_L'), | |
InlineKeyboardButton("M", callback_data='country_M'), | |
InlineKeyboardButton("N", callback_data='country_N'), | |
InlineKeyboardButton("O", callback_data='country_O') | |
], | |
[ | |
InlineKeyboardButton("P", callback_data='country_P'), | |
InlineKeyboardButton("Q", callback_data='country_Q'), | |
InlineKeyboardButton("R", callback_data='country_R'), | |
InlineKeyboardButton("S", callback_data='country_S'), | |
InlineKeyboardButton("T", callback_data='country_T') | |
], | |
[ | |
InlineKeyboardButton("U", callback_data='country_U'), | |
InlineKeyboardButton("V", callback_data='country_V'), | |
InlineKeyboardButton("W", callback_data='country_W'), | |
InlineKeyboardButton("X", callback_data='country_X'), | |
InlineKeyboardButton("Y", callback_data='country_Y'), | |
InlineKeyboardButton("Z", callback_data='country_Z') | |
], | |
[ | |
InlineKeyboardButton("✖️ Cancel", callback_data='cancel') | |
] | |
] | |
poll_keyboard = [[ | |
InlineKeyboardButton("Add/remove me", callback_data="poll_toggle_{}") | |
]] | |
poll_markup = InlineKeyboardMarkup(poll_keyboard) | |
def error(bot, update, error): | |
logger.warning('Update "%s" caused error "%s"', update, error) | |
def keyboard_sort(a, b): | |
if a.text < b.text: | |
return +1 | |
elif a.text > b.text: | |
return 1 | |
else: | |
return 0 | |
def inlinequery(bot, update): | |
query = update.inline_query.query | |
keyboard = [[ | |
InlineKeyboardButton("Show in my timezone", callback_data="convert"), | |
InlineKeyboardButton("Set my timezone", url="https://telegram.me/restzbot") | |
]] | |
markup = InlineKeyboardMarkup(keyboard) | |
results = [] | |
user_messages = get_user_message_by_id(update.inline_query.from_user.id, query) or get_user_messages(update.inline_query.from_user.id) | |
if not user_messages: | |
results = [] | |
elif len(user_messages) > 1: | |
result_messages = {} | |
for msg_uuid, msg in user_messages.items(): | |
if query in msg['message']: | |
result_messages[msg_uuid] = msg | |
if not result_messages: | |
result_message = user_messages | |
else: | |
result_messages = user_messages | |
if result_messages: | |
for msg_uuid, msg in result_messages.items(): | |
if (msg_uuid.startswith("poll_")): | |
results.append(InlineQueryResultArticle( | |
id=msg_uuid, | |
title="Poll", | |
input_message_content=InputTextMessageContent( | |
message_text=msg['message'] | |
), | |
reply_markup=poll_markup | |
)) | |
else: | |
results.append(InlineQueryResultArticle( | |
id=msg_uuid, | |
title=msg['message'], | |
input_message_content=InputTextMessageContent( | |
message_text=msg['message'] | |
), | |
reply_markup=markup | |
)) | |
update.inline_query.answer(results, is_personal=True, cache_time=60) | |
def mytz(bot, update): | |
user_tz = timezone_get(update.message.from_user.id) | |
keyboard = alphakeyboard | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
reply_message = "" | |
if user_tz: | |
reply_message += "Your timezone is currently set to `{}`\n".format(user_tz) | |
reply_message += "Please choose the first letter of your country" | |
return update.message.reply_text(reply_message, parse_mode="Markdown", reply_markup=reply_markup) | |
def button_callbacks(bot, update): | |
query = update.callback_query | |
if query.data.startswith("poll_toggle"): | |
return toggle_user_on_poll(bot, update) | |
if query.data.startswith("poll_close_"): | |
return close_poll(bot, update, query.data[11:]) | |
if query.data.startswith("delete_"): | |
return delete_message(bot, update, query.data[7:]) | |
if query.data == "convert": | |
return convert_message_to_local(bot, update) | |
if query.data == "get_times in local": | |
return send_local_tz(bot, update) | |
if query.data == "cancel": | |
return query.edit_message_text("Command cancelled") | |
if query.data.startswith("tz_"): | |
return set_timezone(bot, update, query.data[3:]) | |
if query.data.startswith("country_code_"): | |
return reply_timezones(bot, update, query.data[-2:]) | |
if query.data.startswith("country_"): | |
return reply_countries(bot, update, query.data[-1]) | |
if query.data == "alphalist": | |
return country_list(bot, update) | |
def toggle_user_on_poll(bot, update): | |
inline_query_id = update.callback_query.inline_message_id | |
msg_info = get_message(inline_query_id) | |
user_id = update.callback_query.from_user.id | |
user_name = update.callback_query.from_user.username or update.callback_query.from_user.first_name | |
user_tz = timezone_get(user_id) or False | |
if not user_tz: | |
return update.callback_query.answer( | |
text='Your timezone is not set', | |
show_alert=True, | |
cache_time=20 | |
) | |
if user_id in [user[0] for user in msg_info['respondents']]: | |
for user in msg_info: | |
if user[0] == user_id: | |
msg_info.remove(user) | |
response_text = 'Your response has been retracted' | |
else: | |
msg_info['respondents'].append([user_id, user_name]) | |
response_text = 'Your response has been recorded' | |
update_chosen(inline_query_id, msg_info) | |
poll_refresh(bot, update, msg_info) | |
return update.callback_query.answer( | |
text=response_text | |
) | |
def update_chosen(inline_query_id, msg_info): | |
with shelve.open("data/chosen", "w") as db: | |
del db[inline_query_id] | |
db[inline_query_id] = msg_info | |
return | |
def poll_refresh(bot, update, msg_info): | |
original_message = msg_info['message'] | |
timezones = {} | |
for user in msg_info['respondents']: | |
user_id = user[0] | |
user_name = user[1] | |
offset = datetime.datetime.now(pytz.timezone(timezone_get(user_id))).strftime("%z") | |
try: | |
timezones[offset].append(user_name) | |
except KeyError: | |
timezones[offset] = [user_name] | |
tz_list = "" | |
print(timezones) | |
for offset in [key for key in timezones.keys()].sort(key=lambda x: int(x)): | |
tz_list += "{}:\n{}".format(offset, "\n- ".join(timezones[offset]["users"])) | |
return update.callback_query.edit_message_text("{}\n{}".format(original_message, tz_list), parse_mode="Markdown", markup=poll_markup) | |
def poll_close(bot, update, poll_id): | |
return | |
def extract_timestamps(input_string): | |
results = [] | |
input_lines = input_string.split("\n") | |
for line in input_lines: | |
finished = False | |
while not finished: | |
try: | |
parsed_dt = parse(line, fuzzy=True, fuzzy_with_tokens=True, dayfirst=True) | |
except ValueError: | |
parsed_dt = False | |
if parsed_dt: | |
results.append(parsed_dt[0]) | |
line = " ".join(parsed_dt[1]) | |
else: | |
finished = True | |
logging.info(results) | |
return results | |
def convert_message_to_local(bot, update): | |
inline_query_id = update.callback_query.inline_message_id | |
msg_info = get_message(inline_query_id) | |
msg_text = msg_info['message'] | |
msg_tz = msg_info['user_tz'] or "UTC" | |
user_tz = timezone_get(update.callback_query.from_user.id) or "UTC" | |
response_text = msg_text | |
dates = msg_info["parsed_dates"] | |
converted_dates = [] | |
target_tz = pytz.timezone(user_tz) | |
default_tz = pytz.timezone(msg_tz) | |
for dt in dates: | |
converted_dt = dt.astimezone(target_tz) | |
# converted_dates.append([str(dt), str(converted_dt)]) | |
converted_dates.append([dt.strftime(std_fmt), converted_dt.strftime(std_fmt)]) | |
alert_to_show = "" | |
for times in converted_dates: | |
string_to_add = "{} = {}".format(times[0], times[1]) | |
if len(string_to_add) + len(alert_to_show) < 198: | |
alert_to_show += "{}\n".format(string_to_add) | |
response_text = alert_to_show | |
return update.callback_query.answer( | |
text=response_text, | |
show_alert=True, | |
cache_time=30 | |
) | |
def create_poll(bot, update): | |
update.message.reply_text( | |
"Send me the text to be attached to your poll" | |
) | |
return WAITING_FOR_POLL | |
def create_message(bot, update): | |
update.message.reply_text( | |
"Send me a message or send /cancel to cancel" | |
) | |
return WAITING_FOR_MSG | |
def cancel_message_create(bot, update): | |
update.message.reply_text( | |
"Message creation cancelled.", | |
parse_mode="Markdown" | |
) | |
return ConversationHandler.END | |
def delete_message(bot, update, message_id): | |
if message_delete(update.callback_query.from_user.id, message_id): | |
return update.callback_query.edit_message_text("_This message was deleted._", parse_mode="Markdown") | |
def message_delete(user_id, message_id): | |
user_id = str(user_id) | |
with shelve.open("data/messages", "w") as db: | |
messages = db[user_id] | |
del messages[message_id] | |
del db[user_id] | |
db[user_id] = messages | |
return True | |
def store_poll(bot, update): | |
user_id = str(update.message.from_user.id) | |
message = update.message.text | |
with shelve.open("data/polls", "c") as db: | |
msg_uuid = uuid4().urn[9:] | |
try: | |
messages = db[user_id] | |
except KeyError: | |
db[user_id] = {} | |
messages = {} | |
messages[msg_uuid] = { | |
"message": "📊 Timezone Poll\n{}".format(message), | |
"respondents": [] | |
} | |
del db[user_id] | |
db[user_id] = messages | |
keyboard = [[ | |
InlineKeyboardButton( | |
"❌ Delete this message", | |
callback_data='delete_poll_{}'.format(msg_uuid)), | |
InlineKeyboardButton("💬 Try it!", | |
switch_inline_query="poll_{}".format(msg_uuid)), | |
InlineKeyboardButton("💬 Try it here!", | |
switch_inline_query_current_chat="poll_{}".format(msg_uuid)) | |
]] | |
markup = InlineKeyboardMarkup(keyboard) | |
update.message.reply_text("Poll created!", | |
reply_markup=markup, | |
reply_to_message_id=update.message.message_id) | |
return ConversationHandler.END | |
def store_message(bot, update): | |
user_id = str(update.message.from_user.id) | |
message = update.message.text | |
message_lines = message.split("\n") | |
parsed_dates = extract_timestamps(message) | |
if not parsed_dates: | |
update.message.reply_text("I couldn't find a date/time in this message. Try again, or send /cancel to cancel.") | |
return WAITING_FOR_MSG | |
user_tz = timezone_get(user_id) | |
with shelve.open("data/messages", "w") as db: | |
msg_uuid = uuid4().urn[9:] | |
try: | |
messages = db[user_id] | |
except KeyError: | |
db[user_id] = {} | |
messages = {} | |
stored_dates = [] | |
default_tz = pytz.timezone(user_tz) | |
for dt in parsed_dates: | |
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None: | |
dt = default_tz.localize(dt) | |
stored_dates.append(dt) | |
messages[msg_uuid] = { | |
"message": message, | |
"user_tz": user_tz, | |
"parsed_dates": stored_dates | |
} | |
del db[user_id] | |
db[user_id] = messages | |
keyboard = [[ | |
InlineKeyboardButton("❌ Delete this message", callback_data='delete_{}'.format(msg_uuid)), | |
InlineKeyboardButton("💬 Try it!", switch_inline_query="{}".format(msg_uuid)), | |
InlineKeyboardButton("💬 Try it here!", switch_inline_query_current_chat="{}".format(msg_uuid)) | |
]] | |
markup = InlineKeyboardMarkup(keyboard) | |
update.message.reply_text("Message created. The date/time(s) detected was: {} (approx {} chars when converted - only 200 characters will be shown)".format( | |
", ".join([ sdt.strftime(std_fmt) for sdt in parsed_dates]) | |
, (len(std_fmt) + 4) * len(parsed_dates) | |
), | |
reply_markup=markup, | |
reply_to_message_id=update.message.message_id | |
) | |
return ConversationHandler.END | |
def get_message(inline_query_id): | |
with shelve.open("data/chosen", "r") as db: | |
try: | |
message = db[inline_query_id] | |
except KeyError: | |
message = False | |
return message | |
def get_user_message_by_id(user_id, message_uuid): | |
user_id = str(user_id) | |
message_uuid = str(message_uuid) | |
if (message_uuid.startswith("poll_")): | |
with shelve.open("data/polls", "r") as db: | |
try: | |
return {message_uuid:db[user_id][message_uuid[5:]]} | |
except KeyError: | |
return False | |
else: | |
with shelve.open("data/messages", "r") as db: | |
try: | |
return {message_uuid:db[user_id][message_uuid]} | |
except KeyError: | |
return False | |
return False | |
def get_user_messages(user_id): | |
user_id = str(user_id) | |
with shelve.open("data/messages", "r") as db: | |
try: | |
return db[user_id] | |
except KeyError: | |
return False | |
return False | |
def set_timezone(bot, update, timezone): | |
tz = timezone_set(update.callback_query.from_user.id, timezone) | |
if tz: | |
update.callback_query.edit_message_text("Your timezone was successfully set to {}".format(tz)) | |
else: | |
update.callback_query.edit_message_text("There was an error setting your timezone. Please try again later.") | |
def timezone_get(user_id): | |
user_id = str(user_id) | |
with shelve.open("data/users", "w") as db: | |
try: | |
timezone = db[user_id] | |
except: | |
return False | |
return timezone | |
def timezone_set(user_id, timezone): | |
user_id = str(user_id) | |
with shelve.open("data/users", "w") as db: | |
db[user_id] = timezone | |
return timezone | |
def reply_timezones(bot, update, country_code): | |
keyboard = [] | |
for tz in pytz.country_timezones[country_code]: | |
if tz not in tz_blacklist: | |
keyboard.append([InlineKeyboardButton(tz, callback_data="tz_{}".format(tz))]) | |
keyboard.append( | |
[ | |
InlineKeyboardButton("✖️ Cancel", callback_data='cancel'), | |
InlineKeyboardButton("◀️ Back", callback_data='country_{}'.format(pytz.country_names[country_code][0])) | |
] | |
) | |
markup = InlineKeyboardMarkup(keyboard) | |
update.callback_query.edit_message_text("Select your timezone", reply_markup=markup) | |
def reply_countries(bot, update, start_letter): | |
keyboard = [] | |
for country in pytz.country_names: | |
if pytz.country_names[country].startswith(start_letter) or country.startswith(start_letter): | |
keyboard.append([InlineKeyboardButton("{} ({})".format(pytz.country_names[country], country.upper()), callback_data='country_code_{}'.format(country))]) | |
for overridecountry, overridecountry_data in country_override.items(): | |
if overridecountry_data["name"].startswith(start_letter): | |
keyboard.append([InlineKeyboardButton("{} ({})".format( | |
overridecountry_data["name"], | |
overridecountry), | |
callback_data='country_code_{}'.format(overridecountry) | |
)]) | |
keyboard.sort(key=lambda x: x[0].text) | |
if keyboard: | |
keyboard.append([ | |
InlineKeyboardButton("✖️ Cancel", callback_data='cancel'), | |
InlineKeyboardButton("◀️ Back", callback_data='alphalist'.format(start_letter)) | |
]) | |
markup = InlineKeyboardMarkup(keyboard) | |
update.callback_query.edit_message_text("Select your country", reply_markup=markup) | |
else: | |
update.callback_query.answer("No countries start with that letter, try another one") | |
return mytz(bot, update) | |
def country_list(bot, update): | |
keyboard = alphakeyboard | |
user_tz = timezone_get(update.callback_query.from_user.id) | |
reply_message = "" | |
if user_tz: | |
reply_message += "Your timezone is currently set to `{}`\n".format(user_tz) | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
reply_message += "Please choose the first letter of your country" | |
return update.callback_query.edit_message_text(reply_message, parse_mode="Markdown", reply_markup=reply_markup) | |
def get_tz_from_loc(bot, update): | |
longitude = update.message.location.longitude | |
latitude = update.message.location.latitude | |
tf = TimezoneFinder() | |
tz = tf.timezone_at(lng=longitude, lat=latitude) | |
if tz: | |
timezone = timezone_set(update.message.from_user.id, tz) | |
if timezone: | |
update.message.reply_text("Your timezone has been set to {}".format(tz)) | |
else: | |
update.message.reply_text("I couldn't set your timezone from your location. Please set it manually using /mytz") | |
def on_result_chosen(bot, update): | |
inline_query_id = update.chosen_inline_result.inline_message_id | |
chosen_id = update.chosen_inline_result.result_id | |
user_id = str(update.chosen_inline_result.from_user.id) | |
user_tz = timezone_get(user_id) or "UTC" | |
if chosen_id.startswith("poll_"): | |
with shelve.open("data/polls", "r") as polldb: | |
message = polldb[user_id][chosen_id[5:]] | |
else: | |
with shelve.open("data/messages", "r") as msgdb: | |
message = msgdb[user_id][chosen_id] | |
with shelve.open("data/chosen", "w") as db: | |
db[inline_query_id] = message | |
def reset_bot(bot, update): | |
with shelve.open("data/users","n") as db: | |
pass | |
with shelve.open("data/messages","n") as db: | |
pass | |
with shelve.open("data/chosen","n") as db: | |
pass | |
update.message.reply_text("Bot data reset") | |
def stats(bot, update): | |
with shelve.open("data/users", "r") as db: | |
users = len(db) | |
with shelve.open("data/messages", "r") as db: | |
creating_users = len(db) | |
messages = 0 | |
min_msgs = False | |
max_msgs = 0 | |
for user, umsgs in db.items(): | |
user_msgs = len([msgid for msgid, msginfo in umsgs.items()]) | |
messages += user_msgs | |
if user_msgs > max_msgs: | |
max_msgs = user_msgs | |
if ((user_msgs < min_msgs) or (not min_msgs)) and user_msgs != 0: | |
min_msgs = user_msgs | |
with shelve.open("data/chosen", "r") as db: | |
chosen = len(db) | |
update.message.reply_text("""📊Stats | |
👩👩👧👧 {} users have set their timezones | |
💬 {} users have created {} messages (min: {}, max: {}) | |
🔊 {} messages have been sent in chat | |
""".format(users, creating_users, messages, min_msgs, max_msgs, chosen), parse_mode="Markdown") | |
def help_msg(bot, update): | |
help_msg_text = """You can use this bot to send dynamic date/times, or you can set your timezone with this bot to view dynamic date/times. | |
- Send your location to set your timezone automatically | |
- Use /mytz to select your timezone manually | |
- Use /new to compose a new message to send | |
- Use this bot in inline mode to send a message with dynamic date/times. Defaults to your timezone unless specified, or UTC if not set. | |
You can send multiple date/times, but each date/time must be on its own line. Expect problems if you have multiple date/times on the same line. There is also a limit of 200 characters for the conversion alert, so you may need to have multiple dates/times in multiple messages. | |
The date/time is set based on your set timezone, or UTC if explicitly set (or if you haven't set a timezone) at the time of creating the message. That means that you can change your timezone, create a message in that timezone, then change it back, and the message will preserve the timezone at the time of setting. | |
Caching: to help with server load, some data is cached - **converted times** are cached for a few minutes, as your timezone is not expected to change very often. **Messages** (which you compose) are also cached in inline mode for a few minutes, so if you don't see the message there immediately, please wait a few minutes before trying. | |
Privacy: the full text of messages are stored. If you send a message using inline mode the message content is stored even if you delete the message later using the bot. However, your user information is not stored with the message, so you can delete it. | |
If you are concerned about sensitive information you may wish to split date/time information into a message separate to other content. | |
If you send the bot your location, your coordinates are used to set your timezone and not stored anywhere.""" | |
return update.message.reply_text(help_msg_text, parse_mode="Markdown") | |
def restart(bot, update): | |
global updater | |
update.message.reply_text("Restarting...") | |
logger.info("Restarted by {}".format(update.message.from_user.id)) | |
updater.stop() | |
sys.exit(2) | |
def main(): | |
global updater | |
new_message_handler = ConversationHandler( | |
entry_points=[ | |
CommandHandler( | |
"new", | |
create_message | |
) | |
], | |
states={ | |
WAITING_FOR_MSG:[ | |
MessageHandler( | |
Filters.text, | |
store_message | |
), | |
CommandHandler( | |
"new", | |
create_message | |
) | |
] | |
}, | |
fallbacks=[ | |
CommandHandler( | |
"cancel", | |
cancel_message_create | |
) | |
] | |
) | |
new_poll_handler = ConversationHandler( | |
entry_points=[ | |
CommandHandler( | |
"newpoll", | |
create_poll | |
) | |
], | |
states={ | |
WAITING_FOR_POLL:[ | |
MessageHandler( | |
Filters.text, | |
store_poll | |
) | |
] | |
}, | |
fallbacks=[ | |
CommandHandler( | |
"cancel", | |
cancel_message_create | |
) | |
] | |
) | |
dp = updater.dispatcher | |
dp.add_handler(CommandHandler('start', help_msg)) | |
dp.add_handler(CommandHandler('help', help_msg)) | |
dp.add_handler(CommandHandler('mytz', mytz)) | |
# dp.add_handler(CommandHandler('reset', reset_bot, filters=Filters.user(username="jimsug"))) | |
dp.add_handler(CommandHandler('stats', stats, filters=Filters.user(username="jimsug"))) | |
dp.add_handler(CommandHandler('restart', restart, filters=Filters.user(username="jimsug"))) | |
dp.add_handler(CallbackQueryHandler(button_callbacks)) | |
dp.add_handler(MessageHandler(Filters.location, get_tz_from_loc)) | |
dp.add_handler(InlineQueryHandler(inlinequery)) | |
dp.add_handler(ChosenInlineResultHandler(on_result_chosen)) | |
dp.add_handler(new_message_handler) | |
dp.add_handler(new_poll_handler) | |
updater.start_polling() | |
updater.idle() | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment