Skip to content

Instantly share code, notes, and snippets.

@GustavoGLD
Created February 3, 2023 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GustavoGLD/b5d103ad0e23fccad5262c401c3c29a1 to your computer and use it in GitHub Desktop.
Save GustavoGLD/b5d103ad0e23fccad5262c401c3c29a1 to your computer and use it in GitHub Desktop.
Achar no Youtube - Telegram Bot
import logging
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
ConversationHandler,
MessageHandler,
filters,
)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.remote.webdriver import WebElement, WebDriver
from selenium.webdriver.support import expected_conditions as EC
import os
from dotenv import load_dotenv
load_dotenv()
# Enable logging
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO
)
logger = logging.getLogger(__name__)
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
message = await update.message.reply_text("Preparando o navegador...")
context.user_data["driver"] = webdriver.Firefox()
await message.edit_text("Digite o que quer buscar!")
context.user_data["driver"].get("https://www.youtube.com/")
return 1
async def typing(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
keyboard: list[list[InlineKeyboardButton]] = []
message = await update.message.reply_text("Pesquisando...")
search_bar = context.user_data["driver"].find_element(By.XPATH, "//input[@id='search']")
search_bar.click()
search_bar.send_keys(update.message.text)
presentations: list[WebElement] = WebDriverWait(context.user_data["driver"], 5).until(
EC.presence_of_all_elements_located((By.XPATH, "//li[@role='presentation']"))
)
keyboard.append([InlineKeyboardButton(update.message.text, callback_data=update.message.text)])
for present in presentations:
keyboard.append([InlineKeyboardButton(present.text, callback_data=present.text)])
keyboard.pop()
reply_markup = InlineKeyboardMarkup(keyboard)
await message.edit_text("Possíveis resultados de pesquisa:", reply_markup=reply_markup)
return 2
async def searching(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
await query.edit_message_text(text=f'Pesquisando por "{" ".join(query.data.split())}"...')
context.user_data["driver"].get("https://www.youtube.com/results?search_query=" + "+".join(query.data.split()))
await query.edit_message_text(f"Deseja filtro por Data de Upload?", reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("Sim", callback_data="True"), InlineKeyboardButton("Não", callback_data="False")],
]))
return 3
async def get_result(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
driver: WebDriver = context.user_data["driver"]
video: WebElement = WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.XPATH, "/html/body/ytd-app/div[1]/ytd-page-manager/ytd-search/div[1]/ytd-two-column-search-results-renderer/div/ytd-section-list-renderer/div[2]/ytd-item-section-renderer/div[3]/ytd-video-renderer[1]"))
).find_element(By.ID, 'thumbnail')
await context.bot.send_message(chat_id=update.effective_chat.id, text=video.get_attribute('href'))
return ConversationHandler.END
async def asking_filter(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
if query.data == "False":
return await get_result(update, context)
filters = ["Última hora", "Hoje", "Esta semana", "Este mês", "Este ano"]
keyboard: list[list[InlineKeyboardButton]] = []
for n in range(len(filters)):
keyboard.append([InlineKeyboardButton(filters[n], callback_data=str(n))])
await query.message.edit_text("Selecione o filtro:", reply_markup=InlineKeyboardMarkup(keyboard))
return 4
async def filtering(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
query = update.callback_query
await query.answer()
driver: WebDriver = context.user_data["driver"]
driver.find_element(By.XPATH, f"//button[@aria-label='Filtros de enquete']").click()
WebDriverWait(driver, 5).until(
EC.presence_of_all_elements_located((By.XPATH, f"//ytd-search-filter-renderer"))
)[int(query.data)].find_element(By.ID, 'endpoint').click()
await query.message.edit_text("Encontramos um vídeo para você!")
return await get_result(update, context)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Use /start para começarmos!")
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user = update.message.from_user
logger.info("User %s canceled the conversation.", user.first_name)
await update.message.reply_text(
"Volte sempre!"
)
return ConversationHandler.END
def main() -> None:
application = Application.builder().token(os.environ['TOKEN']).build()
conv_handler = ConversationHandler(
entry_points=[CommandHandler("start", start)],
states={
1: [MessageHandler(filters.TEXT, typing)],
2: [CallbackQueryHandler(searching)],
3: [CallbackQueryHandler(asking_filter)],
4: [CallbackQueryHandler(filtering)],
},
fallbacks=[CommandHandler("cancel", cancel)]
)
application.add_handler(conv_handler)
application.add_handler(CommandHandler("help", help_command))
application.run_polling()
if __name__ == "__main__":
main()
@GustavoGLD
Copy link
Author

Uma aplicação de rascunho feita rapidamente para explorar o potencial do python-telegram-bot junto com selenium

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment