Last active
July 21, 2021 09:52
-
-
Save Mortafix/93a866c71040557c309f0115aa1a2767 to your computer and use it in GitHub Desktop.
AutoConv real example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import warnings | |
from re import match | |
from uuid import uuid4 | |
from autoconv.utils.persistence import AutoconvPersistence | |
from config import Config | |
from funcs import emz, pprint_media, pprint_trailers | |
from pymortafix.telegram import error_handler, standard_message | |
from states import STATE, states_handler | |
from telegram import InlineQueryResultPhoto | |
from telegram.ext import (CallbackQueryHandler, CommandHandler, | |
ConversationHandler, Filters, InlineQueryHandler, | |
MessageHandler, Updater) | |
from tmdb import trailers | |
warnings.filterwarnings("ignore") | |
# COMMANDS -------------------------------------------------------------------- | |
def error(update, context): | |
message = ":children_crossing: *BOT ERROR* :children_crossing:" | |
text = standard_message(update, message, "PopCorn Bot", "https://t.me/poppycorn_bot") | |
if not update.channel_post: | |
error_handler( | |
update, context, Config.ERROR_CHANNEL, text, logger=Config.LOGGER, log=sys.argv[-1] != "DEV" | |
) | |
def handle_text(update, context): | |
update.message.delete() | |
def autoconv_command(update, context): | |
return states_handler.manage_conversation(update, context, delete_first=True) | |
def restart(update, context): | |
states_handler.restart() | |
return states_handler.manage_conversation(update, context, delete_first=True) | |
def reboot(update, context): | |
context.user_data.clear() | |
return states_handler.manage_conversation(update, context, delete_first=True) | |
def inlinequery(update, context): | |
query = update.inline_query.query | |
query = match(r"(movie|tv)(\d+)", query) | |
results = [] | |
if query: | |
media_type, media_id = query.groups() | |
result = pprint_media(media_id, media_type) | |
if not result: | |
return update.inline_query.answer(results) | |
msg, poster, _ = result | |
trailers_dict = trailers(media_id, media_type) | |
if any(trailers_dict.values()): | |
msg += f"\n\n:clapper_board: *Trailer*\n{pprint_trailers(trailers_dict)}" | |
if poster: | |
results = [ | |
InlineQueryResultPhoto( | |
type="photo", | |
id=uuid4(), | |
photo_url=poster, | |
thumb_url=poster, | |
caption=emz(msg), | |
parse_mode="Markdown", | |
) | |
] | |
update.inline_query.answer(results) | |
# MAIN ------------------------------------------------------------------------ | |
def main(): | |
pp = AutoconvPersistence(filename="bot_persistence", bot_token=Config.BOT_TOKEN) | |
updater = Updater(Config.BOT_TOKEN, persistence=pp) | |
dp = updater.dispatcher | |
# conversations | |
autoconv = ConversationHandler( | |
entry_points=[CommandHandler("start", autoconv_command)], | |
states={ | |
STATE: [ | |
CommandHandler("start", restart), | |
MessageHandler(Filters.text, autoconv_command), | |
CallbackQueryHandler(autoconv_command), | |
] | |
}, | |
fallbacks=[CommandHandler("error", error)], | |
name="pcbot-conversation", | |
persistent=True, | |
) | |
# handlers | |
dp.add_handler(InlineQueryHandler(inlinequery)) | |
dp.add_handler(CommandHandler("reboot", reboot)) | |
dp.add_handler(autoconv) | |
dp.add_handler(MessageHandler(Filters.all, handle_text)) | |
dp.add_error_handler(error) | |
# ---------------------------------------------------------------------- | |
updater.start_polling() | |
updater.idle() | |
if __name__ == "__main__": | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from re import search | |
from config import Config | |
from emoji import emojize | |
from telegram import InlineKeyboardButton | |
from tmdb import (discover, info_media, popular, providers, searching, | |
toprated, trailers, upcoming) | |
def emz(text): | |
if isinstance(text, list): | |
return list(map(emz, text)) | |
return emojize(text, use_aliases=True) | |
def flush_context(tdata, extra=True): | |
for flush in ( | |
"extra_msg", | |
"extra" if extra else None, | |
"prev_media" if extra else None, | |
): | |
if flush in tdata.udata: | |
if (p := tdata.udata.get(flush)) and flush in ("extra_msg",): | |
p.delete() | |
tdata.context.user_data.pop(flush) | |
def menu_flush(tdata): | |
flush_context(tdata) | |
tdata.context.user_data.update({"data": {}}) | |
# ----------- | |
def get_media_type(tdata): | |
movie, tv = "movie", "tv" | |
if (media := tdata.sdata.get("media")) and search("Serie", media): | |
return tv | |
return movie | |
def get_media_list(tdata): | |
media_type = get_media_type(tdata) | |
mode = str(tdata.sdata.get("menu")) | |
if search("Popolari", mode): | |
all_services = search("Tutti", tdata.sdata.get("services") or "") | |
return popular(media_type) if all_services else discover(media_type) | |
if search("Migliori", mode): | |
return toprated(media_type) | |
if search("Prossime", mode): | |
return upcoming() | |
query = tdata.sdata.get("searching") | |
return searching(media_type, query) | |
def info_action(tdata): | |
flush_context(tdata, extra=False) | |
if not tdata.udata.get("list"): | |
return emz(":cross_mark: Nessun *media* trovato!") | |
media_id = tdata.udata.get("list_el") | |
media_type = get_media_type(tdata) | |
# change media | |
if not (prev := tdata.udata.get("prev_media")) or prev != media_id: | |
tdata.context.user_data.update({"prev_media": media_id}) | |
msg, poster, overview = pprint_media(media_id, media_type) | |
trailer = trailers(media_id, media_type) | |
tdata.context.user_data.update( | |
{"extra": {"poster": poster, "trailer": trailer, "overview": overview}} | |
) | |
else: | |
msg, _, _ = pprint_media(media_id, media_type) | |
# extra message | |
if any("Trailer" in v for v in tdata.sdata.values()): | |
trailers_dict = tdata.udata.get("extra").get("trailer") | |
extra = tdata.context.bot.send_message( | |
tdata.telegram_id, pprint_trailers(trailers_dict), parse_mode="Markdown" | |
) | |
elif any("Trama" in v for v in tdata.sdata.values()): | |
overview = tdata.udata.get("extra").get("overview") | |
extra = tdata.context.bot.send_message( | |
tdata.telegram_id, | |
emz(f":thought_balloon: _{overview}_"), | |
parse_mode="Markdown", | |
) | |
elif (extra := tdata.udata.get("extra")) and (poster := extra.get("poster")): | |
extra = tdata.context.bot.send_photo(tdata.telegram_id, poster) | |
else: | |
extra = None | |
tdata.context.user_data.update({"extra_msg": extra}) | |
return emz(msg) | |
def extra_button(tdata): | |
if not tdata.udata.get("list"): | |
return [] | |
if (extra := tdata.udata.get("extra")) : | |
poster = extra.get("poster") | |
trailer = any(extra.get("trailer").values()) | |
overview = extra.get("overview") | |
else: | |
poster, trailer, overview = None, None, None | |
keyboard_lables = [ | |
":sunset: Copertina", | |
":clapper_board: Trailer", | |
":bookmark: Trama", | |
] | |
keyboard = [ | |
l | |
for s, l in zip((poster, trailer, overview), emz(keyboard_lables)) | |
if s and l not in tdata.sdata.values() | |
] | |
return ( | |
keyboard | |
if any(emz(label) in tdata.sdata.values() for label in keyboard_lables) | |
else keyboard[1:] | |
) | |
def info_media_keyboard(tdata): | |
if not tdata.udata.get("list"): | |
return [] | |
media_type = get_media_type(tdata) | |
media_id = tdata.udata.get("list_el") | |
return [ | |
[ | |
InlineKeyboardButton(b, callback_data=i) | |
for i, b in enumerate(extra_button(tdata)) | |
], | |
[ | |
InlineKeyboardButton( | |
emz(":party_popper: Condividi"), | |
switch_inline_query=f"{media_type}{media_id}", | |
) | |
], | |
[InlineKeyboardButton(emz(":globe_with_meridians: Menù"), callback_data=99)], | |
] | |
# ----------- | |
def pprint_media(media_id, media_type): | |
title, overview, release_date, vote, genres, poster = info_media( | |
media_id, media_type | |
) | |
if not title: | |
return None | |
providers_info = pprint_providers(providers(media_id, media_type)) | |
# info to str | |
vote = f":trophy: *{vote*10:.0f}*\n" if vote else "" | |
date = ( | |
f":calendar: {datetime.fromisoformat(release_date):%d.%m.%Y}\n" | |
if release_date | |
else "" | |
) | |
genres = f":admission_tickets: {', '.join(genres)}" if genres else "" | |
# msg creation | |
msg = ( | |
f":popcorn: *{title.upper()}* :popcorn:\n\n{date}{vote}{genres}{providers_info}" | |
) | |
poster = poster and Config.TMDB_IMAGES_URL + poster | |
return msg, poster, overview | |
def pprint_providers(providers): | |
flatrate, rent, buy = ( | |
providers.get("flatrate"), | |
providers.get("rent"), | |
providers.get("buy"), | |
) | |
f_msg, r_msg, b_msg, msg = None, None, None, "" | |
if flatrate: | |
f_msg = f":desktop_computer: {', '.join(f'*{m}*' for m in flatrate)}" | |
if rent: | |
r_msg = f":ticket: {', '.join(f'*{m}*' for m in rent)}" | |
if buy: | |
b_msg = f":euro_banknote: {', '.join(f'*{m}*' for m in buy)}" | |
if rent or buy or flatrate: | |
msg = "\n\n" + "\n".join(m for m in (f_msg, r_msg, b_msg) if m) | |
return emz(msg) | |
def pprint_trailers(trailers): | |
emoji_flags = {"ENG": ":United_Kingdom:", "ITA": ":Italy:"} | |
msg = "" | |
for lang, trailer in trailers.items(): | |
if trailer: | |
msg += f"{emoji_flags.get(lang)} [{trailer[0]}]\n" | |
return emz(msg) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
from re import search | |
from autoconv.autoconv_handler import AutoConvHandler | |
from autoconv.conversation import Conversation | |
from autoconv.state import State | |
from config import Config | |
from emoji import emojize | |
from funcs import (emz, flush_context, get_media_list, info_action, | |
info_media_keyboard, menu_flush) | |
from pymortafix.telegram import error_handler, standard_message | |
STATE, states_handler = range(2) | |
def error_action(tdata): | |
message = ":children_crossing: *BOT ERROR* :children_crossing:" | |
text = standard_message(tdata.update, message, "PopCorn Bot", "https://t.me/poppycorn_bot") | |
if not tdata.update.channel_post: | |
error_handler( | |
tdata.update, tdata.context, Config.ERROR_CHANNEL, text, logger=Config.LOGGER, exception=tdata.exception, log=sys.argv[-1] != "DEV" | |
) | |
flush_context(tdata) | |
return "Qualcosa è andato *storto*, mi spiace." | |
# States | |
error = State("error", "@@@", back_button=False) | |
error.add_action(error_action) | |
error.add_keyboard(["Menu"]) | |
menu = State("menu", back_button=False) | |
menu_keyboard = [ | |
":fire: Popolari", | |
":crystal_ball: Prossime Uscite", | |
":1st_place_medal: Migliori", | |
":magnifying_glass_tilted_right: Cerca", | |
] | |
menu.add_keyboard(emz(menu_keyboard), (2, 2)) | |
menu.add_action(menu_flush) | |
media = State("media") | |
media.add_keyboard(emz([":film_projector: Film", ":television: Serie TV"])) | |
media.add_action(flush_context) | |
services = State("services") | |
services.add_keyboard(emz([":globe_showing_Europe-Africa: Tutti", ":Italy: Migliori"])) | |
services.add_action(flush_context) | |
searching = State("searching", data_type=str) | |
searching.add_text() | |
searching.add_action(flush_context) | |
show_list = State("showlist", "@@@", back_button=False) | |
show_list.add_dynamic_list( | |
get_media_list, | |
left_button=emojize(":reverse_button:"), | |
right_button=emojize(":play_button:"), | |
) | |
show_list.add_action(info_action) | |
show_list.add_custom_keyboard(info_media_keyboard) | |
show_list.set_long_task(emz(":magnifying_glass_tilted_left: Searching...")) | |
# Dynamic routes | |
def media_routes(tdata): | |
menu_var = tdata.sdata.get("menu") | |
if not menu_var: | |
return (None, services, menu) | |
if search("Cerca", menu_var): | |
return (None, searching, menu) | |
if search("Migliori", menu_var): | |
return (None, show_list, menu) | |
return (None, services, menu) | |
# Routes | |
conv = Conversation(menu, fallback_state=error, state_messages="texts.yaml") | |
conv.set_defaults( | |
back_button=emojize(":BACK_arrow: Indietro"), | |
func=emz, | |
params={"parse_mode": "Markdown"}, | |
) | |
conv.add_routes(error, default=menu) | |
conv.add_routes(menu, default=media, routes={1: show_list}) | |
conv.add_routes(media, default=services, back=menu) | |
media.add_dynamic_routes(media_routes) | |
conv.add_routes(services, default=show_list, back=media) | |
conv.add_routes(searching, default=show_list, back=media) | |
conv.add_routes(show_list, default=show_list, routes={99: menu}) | |
# Autoconv | |
states_handler = AutoConvHandler(conv, STATE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment