Skip to content

Instantly share code, notes, and snippets.

@Mortafix
Last active July 21, 2021 09:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Mortafix/93a866c71040557c309f0115aa1a2767 to your computer and use it in GitHub Desktop.
Save Mortafix/93a866c71040557c309f0115aa1a2767 to your computer and use it in GitHub Desktop.
AutoConv real example
import sys
import warnings
from re import match
from uuid import uuid4
from autoconv.utils.persistence import AutoconvPersistence
from config import Config
from funcs import emz, pprint_media, pprint_trailers
from pymortafix.telegram import error_handler, standard_message
from states import STATE, states_handler
from telegram import InlineQueryResultPhoto
from telegram.ext import (CallbackQueryHandler, CommandHandler,
ConversationHandler, Filters, InlineQueryHandler,
MessageHandler, Updater)
from tmdb import trailers
warnings.filterwarnings("ignore")
# COMMANDS --------------------------------------------------------------------
def error(update, context):
message = ":children_crossing: *BOT ERROR* :children_crossing:"
text = standard_message(update, message, "PopCorn Bot", "https://t.me/poppycorn_bot")
if not update.channel_post:
error_handler(
update, context, Config.ERROR_CHANNEL, text, logger=Config.LOGGER, log=sys.argv[-1] != "DEV"
)
def handle_text(update, context):
update.message.delete()
def autoconv_command(update, context):
return states_handler.manage_conversation(update, context, delete_first=True)
def restart(update, context):
states_handler.restart()
return states_handler.manage_conversation(update, context, delete_first=True)
def reboot(update, context):
context.user_data.clear()
return states_handler.manage_conversation(update, context, delete_first=True)
def inlinequery(update, context):
query = update.inline_query.query
query = match(r"(movie|tv)(\d+)", query)
results = []
if query:
media_type, media_id = query.groups()
result = pprint_media(media_id, media_type)
if not result:
return update.inline_query.answer(results)
msg, poster, _ = result
trailers_dict = trailers(media_id, media_type)
if any(trailers_dict.values()):
msg += f"\n\n:clapper_board: *Trailer*\n{pprint_trailers(trailers_dict)}"
if poster:
results = [
InlineQueryResultPhoto(
type="photo",
id=uuid4(),
photo_url=poster,
thumb_url=poster,
caption=emz(msg),
parse_mode="Markdown",
)
]
update.inline_query.answer(results)
# MAIN ------------------------------------------------------------------------
def main():
pp = AutoconvPersistence(filename="bot_persistence", bot_token=Config.BOT_TOKEN)
updater = Updater(Config.BOT_TOKEN, persistence=pp)
dp = updater.dispatcher
# conversations
autoconv = ConversationHandler(
entry_points=[CommandHandler("start", autoconv_command)],
states={
STATE: [
CommandHandler("start", restart),
MessageHandler(Filters.text, autoconv_command),
CallbackQueryHandler(autoconv_command),
]
},
fallbacks=[CommandHandler("error", error)],
name="pcbot-conversation",
persistent=True,
)
# handlers
dp.add_handler(InlineQueryHandler(inlinequery))
dp.add_handler(CommandHandler("reboot", reboot))
dp.add_handler(autoconv)
dp.add_handler(MessageHandler(Filters.all, handle_text))
dp.add_error_handler(error)
# ----------------------------------------------------------------------
updater.start_polling()
updater.idle()
if __name__ == "__main__":
main()
from datetime import datetime
from re import search
from config import Config
from emoji import emojize
from telegram import InlineKeyboardButton
from tmdb import (discover, info_media, popular, providers, searching,
toprated, trailers, upcoming)
def emz(text):
if isinstance(text, list):
return list(map(emz, text))
return emojize(text, use_aliases=True)
def flush_context(tdata, extra=True):
for flush in (
"extra_msg",
"extra" if extra else None,
"prev_media" if extra else None,
):
if flush in tdata.udata:
if (p := tdata.udata.get(flush)) and flush in ("extra_msg",):
p.delete()
tdata.context.user_data.pop(flush)
def menu_flush(tdata):
flush_context(tdata)
tdata.context.user_data.update({"data": {}})
# -----------
def get_media_type(tdata):
movie, tv = "movie", "tv"
if (media := tdata.sdata.get("media")) and search("Serie", media):
return tv
return movie
def get_media_list(tdata):
media_type = get_media_type(tdata)
mode = str(tdata.sdata.get("menu"))
if search("Popolari", mode):
all_services = search("Tutti", tdata.sdata.get("services") or "")
return popular(media_type) if all_services else discover(media_type)
if search("Migliori", mode):
return toprated(media_type)
if search("Prossime", mode):
return upcoming()
query = tdata.sdata.get("searching")
return searching(media_type, query)
def info_action(tdata):
flush_context(tdata, extra=False)
if not tdata.udata.get("list"):
return emz(":cross_mark: Nessun *media* trovato!")
media_id = tdata.udata.get("list_el")
media_type = get_media_type(tdata)
# change media
if not (prev := tdata.udata.get("prev_media")) or prev != media_id:
tdata.context.user_data.update({"prev_media": media_id})
msg, poster, overview = pprint_media(media_id, media_type)
trailer = trailers(media_id, media_type)
tdata.context.user_data.update(
{"extra": {"poster": poster, "trailer": trailer, "overview": overview}}
)
else:
msg, _, _ = pprint_media(media_id, media_type)
# extra message
if any("Trailer" in v for v in tdata.sdata.values()):
trailers_dict = tdata.udata.get("extra").get("trailer")
extra = tdata.context.bot.send_message(
tdata.telegram_id, pprint_trailers(trailers_dict), parse_mode="Markdown"
)
elif any("Trama" in v for v in tdata.sdata.values()):
overview = tdata.udata.get("extra").get("overview")
extra = tdata.context.bot.send_message(
tdata.telegram_id,
emz(f":thought_balloon: _{overview}_"),
parse_mode="Markdown",
)
elif (extra := tdata.udata.get("extra")) and (poster := extra.get("poster")):
extra = tdata.context.bot.send_photo(tdata.telegram_id, poster)
else:
extra = None
tdata.context.user_data.update({"extra_msg": extra})
return emz(msg)
def extra_button(tdata):
if not tdata.udata.get("list"):
return []
if (extra := tdata.udata.get("extra")) :
poster = extra.get("poster")
trailer = any(extra.get("trailer").values())
overview = extra.get("overview")
else:
poster, trailer, overview = None, None, None
keyboard_lables = [
":sunset: Copertina",
":clapper_board: Trailer",
":bookmark: Trama",
]
keyboard = [
l
for s, l in zip((poster, trailer, overview), emz(keyboard_lables))
if s and l not in tdata.sdata.values()
]
return (
keyboard
if any(emz(label) in tdata.sdata.values() for label in keyboard_lables)
else keyboard[1:]
)
def info_media_keyboard(tdata):
if not tdata.udata.get("list"):
return []
media_type = get_media_type(tdata)
media_id = tdata.udata.get("list_el")
return [
[
InlineKeyboardButton(b, callback_data=i)
for i, b in enumerate(extra_button(tdata))
],
[
InlineKeyboardButton(
emz(":party_popper: Condividi"),
switch_inline_query=f"{media_type}{media_id}",
)
],
[InlineKeyboardButton(emz(":globe_with_meridians: Menù"), callback_data=99)],
]
# -----------
def pprint_media(media_id, media_type):
title, overview, release_date, vote, genres, poster = info_media(
media_id, media_type
)
if not title:
return None
providers_info = pprint_providers(providers(media_id, media_type))
# info to str
vote = f":trophy: *{vote*10:.0f}*\n" if vote else ""
date = (
f":calendar: {datetime.fromisoformat(release_date):%d.%m.%Y}\n"
if release_date
else ""
)
genres = f":admission_tickets: {', '.join(genres)}" if genres else ""
# msg creation
msg = (
f":popcorn: *{title.upper()}* :popcorn:\n\n{date}{vote}{genres}{providers_info}"
)
poster = poster and Config.TMDB_IMAGES_URL + poster
return msg, poster, overview
def pprint_providers(providers):
flatrate, rent, buy = (
providers.get("flatrate"),
providers.get("rent"),
providers.get("buy"),
)
f_msg, r_msg, b_msg, msg = None, None, None, ""
if flatrate:
f_msg = f":desktop_computer: {', '.join(f'*{m}*' for m in flatrate)}"
if rent:
r_msg = f":ticket: {', '.join(f'*{m}*' for m in rent)}"
if buy:
b_msg = f":euro_banknote: {', '.join(f'*{m}*' for m in buy)}"
if rent or buy or flatrate:
msg = "\n\n" + "\n".join(m for m in (f_msg, r_msg, b_msg) if m)
return emz(msg)
def pprint_trailers(trailers):
emoji_flags = {"ENG": ":United_Kingdom:", "ITA": ":Italy:"}
msg = ""
for lang, trailer in trailers.items():
if trailer:
msg += f"{emoji_flags.get(lang)} [{trailer[0]}]\n"
return emz(msg)
import sys
from re import search
from autoconv.autoconv_handler import AutoConvHandler
from autoconv.conversation import Conversation
from autoconv.state import State
from config import Config
from emoji import emojize
from funcs import (emz, flush_context, get_media_list, info_action,
info_media_keyboard, menu_flush)
from pymortafix.telegram import error_handler, standard_message
STATE, states_handler = range(2)
def error_action(tdata):
message = ":children_crossing: *BOT ERROR* :children_crossing:"
text = standard_message(tdata.update, message, "PopCorn Bot", "https://t.me/poppycorn_bot")
if not tdata.update.channel_post:
error_handler(
tdata.update, tdata.context, Config.ERROR_CHANNEL, text, logger=Config.LOGGER, exception=tdata.exception, log=sys.argv[-1] != "DEV"
)
flush_context(tdata)
return "Qualcosa è andato *storto*, mi spiace."
# States
error = State("error", "@@@", back_button=False)
error.add_action(error_action)
error.add_keyboard(["Menu"])
menu = State("menu", back_button=False)
menu_keyboard = [
":fire: Popolari",
":crystal_ball: Prossime Uscite",
":1st_place_medal: Migliori",
":magnifying_glass_tilted_right: Cerca",
]
menu.add_keyboard(emz(menu_keyboard), (2, 2))
menu.add_action(menu_flush)
media = State("media")
media.add_keyboard(emz([":film_projector: Film", ":television: Serie TV"]))
media.add_action(flush_context)
services = State("services")
services.add_keyboard(emz([":globe_showing_Europe-Africa: Tutti", ":Italy: Migliori"]))
services.add_action(flush_context)
searching = State("searching", data_type=str)
searching.add_text()
searching.add_action(flush_context)
show_list = State("showlist", "@@@", back_button=False)
show_list.add_dynamic_list(
get_media_list,
left_button=emojize(":reverse_button:"),
right_button=emojize(":play_button:"),
)
show_list.add_action(info_action)
show_list.add_custom_keyboard(info_media_keyboard)
show_list.set_long_task(emz(":magnifying_glass_tilted_left: Searching..."))
# Dynamic routes
def media_routes(tdata):
menu_var = tdata.sdata.get("menu")
if not menu_var:
return (None, services, menu)
if search("Cerca", menu_var):
return (None, searching, menu)
if search("Migliori", menu_var):
return (None, show_list, menu)
return (None, services, menu)
# Routes
conv = Conversation(menu, fallback_state=error, state_messages="texts.yaml")
conv.set_defaults(
back_button=emojize(":BACK_arrow: Indietro"),
func=emz,
params={"parse_mode": "Markdown"},
)
conv.add_routes(error, default=menu)
conv.add_routes(menu, default=media, routes={1: show_list})
conv.add_routes(media, default=services, back=menu)
media.add_dynamic_routes(media_routes)
conv.add_routes(services, default=show_list, back=media)
conv.add_routes(searching, default=show_list, back=media)
conv.add_routes(show_list, default=show_list, routes={99: menu})
# Autoconv
states_handler = AutoConvHandler(conv, STATE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment