Last active
June 18, 2022 16:31
-
-
Save YadominJinta/6eb74a047d8f1f19e28b9082b063e553 to your computer and use it in GitHub Desktop.
tg bot download and convert video or normal stickers, and gif.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import os | |
import subprocess | |
from aiogram import Bot, Dispatcher, types | |
from zipfile import ZipFile | |
BOT_TOKEN = "" | |
WEBM_EXT = '.webm' | |
WEBP_EXT = '.webp' | |
GIF_EXT = '.gif' | |
PNG_EXT = '.png' | |
ZIP_EXT = '.zip' | |
MP4_EXT = '.mp4' | |
bot = Bot(BOT_TOKEN) | |
dp = Dispatcher(bot) | |
download_path = os.path.join(os.getenv('HOME'), 'Downloads', 'tg-bot') | |
def get_file_path(file_id: str, extension: str): | |
return os.path.join(download_path, file_id + extension) | |
def convert_file(file_id: str, origin_ext: str, convert_ext: str): | |
subprocess.run([ | |
'ffmpeg', '-i', | |
get_file_path(file_id, origin_ext), | |
get_file_path(file_id, convert_ext) | |
]) | |
def archive_file(file_id: str, origin_ext: str, convert_ext: str): | |
with ZipFile(get_file_path(file_id, ZIP_EXT), 'x') as f: | |
f.write(get_file_path(file_id, origin_ext), file_id + origin_ext), | |
f.write(get_file_path(file_id, convert_ext), file_id + convert_ext) | |
def check_file_downloaded(file_id: str, origin_ext: str, convert_ext: str) -> bool: | |
if not os.path.exists(get_file_path(file_id, origin_ext)): | |
return False | |
if not os.path.exists(get_file_path(file_id, convert_ext)): | |
convert_file(file_id, origin_ext, convert_ext) | |
if not os.path.exists(get_file_path(file_id, ZIP_EXT)): | |
archive_file(file_id, origin_ext, convert_ext) | |
@dp.message_handler(content_types=types.ContentType.ANY) | |
async def message_handler(event: types.Message): | |
if event.sticker is not None: | |
file_id = event.sticker.file_id | |
if event.sticker.is_video: | |
origin_ext = WEBM_EXT | |
convert_ext = GIF_EXT | |
elif event.sticker.is_animated: | |
await event.reply('Not support animated sticker') | |
return | |
else: | |
origin_ext = WEBP_EXT | |
convert_ext = PNG_EXT | |
elif event.document is not None: | |
file_id = event.document.file_id | |
if event.document.mime_type != 'video/mp4': | |
await event.reply('Not support file type') | |
return | |
origin_ext = MP4_EXT | |
convert_ext = GIF_EXT | |
else: | |
return | |
await event.reply("Processing") | |
if not check_file_downloaded(file_id, origin_ext, convert_ext): | |
await bot.download_file_by_id( | |
file_id, | |
destination=get_file_path(file_id, origin_ext) | |
) | |
convert_file(file_id, origin_ext, convert_ext) | |
archive_file(file_id, origin_ext, convert_ext) | |
input_file = types.InputFile(get_file_path(file_id, ZIP_EXT)) | |
await event.reply_document(input_file, disable_content_type_detection=True) | |
async def main(): | |
try: | |
dp.register_message_handler(message_handler, state='*') | |
await dp.start_polling() | |
finally: | |
await dp.wait_closed() | |
await bot.close() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Support:
Not Support: