Skip to content

Instantly share code, notes, and snippets.

@Poolitzer
Last active September 8, 2022 18:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Poolitzer/05281c7f94d96701538eff0799aee1cd to your computer and use it in GitHub Desktop.
Save Poolitzer/05281c7f94d96701538eff0799aee1cd to your computer and use it in GitHub Desktop.
Running an IO task with PTB > 20 while the updates are still handled.
# this example is based on https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
import asyncio
import time
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes, MessageHandler, filters
def blocking_io():
# File operations (such as logging) can block the
# event loop: run them in a thread pool.
time.sleep(10)
return "wuhu"
# the task will run in the background and start the threading stuff
async def ytb_dl_reply(update: Update, context: ContextTypes.DEFAULT_TYPE):
loop = asyncio.get_running_loop()
fut = await loop.run_in_executor(None, blocking_io)
await update.message.reply_text(fut)
# start will reply with a message and start the background task
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued."""
await update.message.reply_html(
f"I will reply in 10 seconds!"
)
# this will run the task in the background and even await on it on shutdown
# application is neat
context.application.create_task(ytb_dl_reply(update, context), update)
# showing that it actually runs in the background, the bot will reply to these instantly even if the thread runs
async def echo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Echo the user message."""
await update.message.reply_text(update.message.text)
def main() -> None:
"""Start the bot."""
# Create the Application and pass it your bot's token.
application = Application.builder().token("TOKEN").build()
# on different commands - answer in Telegram
application.add_handler(CommandHandler("start", start))
# on non command i.e. message - echo the message on Telegram
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, echo))
# Run the bot until the user presses Ctrl-C
application.run_polling()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment