Skip to content

Instantly share code, notes, and snippets.

@JrooTJunior
Last active October 27, 2018 08:43
Show Gist options
  • Save JrooTJunior/7f21527238bf06570f5978b8eb671065 to your computer and use it in GitHub Desktop.
Save JrooTJunior/7f21527238bf06570f5978b8eb671065 to your computer and use it in GitHub Desktop.
Stickers download bot. License: MIT
"""
Requirements:
aiogram>=2.0.rc1
Pillow
emoji
"""
import asyncio
import io
import logging
import os
import zipfile
from PIL import Image
from aiogram import Bot, Dispatcher, executor, md, types
from aiogram.contrib.middlewares.logging import LoggingMiddleware
from aiogram.types import ChatActions, ParseMode
from aiogram.utils.emoji import demojize
from aiogram.utils.exceptions import BotBlocked, InvalidStickersSet
TOKEN = os.environ['TOKEN']
TIMEOUT = 15
logging.basicConfig(level=logging.INFO)
log = logging.getLogger('bot')
loop = asyncio.get_event_loop()
bot = Bot(token=TOKEN, loop=loop, parse_mode=ParseMode.HTML)
dp = Dispatcher(bot)
dp.middleware.setup(LoggingMiddleware(log))
ACTIVE = 'active_downloads'
FAILED = 'failed_downloads'
COMPLETED = 'success_downloads'
UPLOADING = 'uploading'
bot[ACTIVE] = 0
bot[FAILED] = 0
bot[COMPLETED] = 0
bot[UPLOADING] = 0
def webp_to_png(image: io.BytesIO) -> io.BytesIO:
sticker_png = io.BytesIO()
im = Image.open(image).convert('RGBA')
im.save(sticker_png, "png")
sticker_png.seek(0)
return sticker_png
async def get_sticker_as_png(sticker) -> io.BytesIO:
sticker_webp = io.BytesIO()
await sticker.download(sticker_webp, chunk_size=2048)
await asyncio.sleep(0)
return webp_to_png(sticker_webp)
@dp.message_handler(types.ChatType.is_private, content_types=types.ContentType.STICKER)
async def sticker_handler(message: types.Message):
try:
stickers_set = await bot.get_sticker_set(message.sticker.set_name)
except InvalidStickersSet:
return await message.reply('Invalid stickers set!')
await message.reply('Downloading stickers set... \n🕑 It can take some time.')
archive_zip = io.BytesIO()
try:
bot[ACTIVE] += 1
with zipfile.ZipFile(archive_zip, "a", zipfile.ZIP_DEFLATED, False) as archive:
for index, sticker in enumerate(stickers_set.stickers):
sticker_name = demojize(sticker.emoji).replace('::', '_').replace(':', '')
filename = f"{index:03}_{sticker_name}"
log.info(f"Download sticker '{sticker.file_id}' from '{stickers_set.name}' as '{filename}'")
sticker_png = await get_sticker_as_png(sticker)
archive.writestr(f"{filename}.png", sticker_png.read())
del sticker_png
except:
await message.reply('Something went wrong! 😔')
log.exception('Failed to download stickers')
bot[FAILED] += 1
else:
log.info(f"Start uploading '{stickers_set.name}' for {message.from_user.id}")
await ChatActions.upload_document()
archive_zip.seek(0)
bot[UPLOADING] += 1
try:
await message.reply_document(types.InputFile(archive_zip, filename=f'{stickers_set.name}.zip'),
'🏞 ' +
md.hbold('Stickers set:') + ' ' + md.hcode(stickers_set.title) + '\n' +
md.hbold('Stickers in set:') + ' ' + md.hcode(len(stickers_set.stickers)))
except BotBlocked:
bot[FAILED] += 1
return log.warning(f"Can't upload file for {message.from_user.id} because the bot was blocked by the user.")
else:
bot[COMPLETED] += 1
finally:
bot[UPLOADING] -= 1
finally:
bot[ACTIVE] -= 1
if bot[ACTIVE]:
log.info(f"Sticker set '{stickers_set.name}' completed. Active downloads count: {bot[ACTIVE]}")
else:
log.info(f'All downloads finished!')
del archive_zip
@dp.message_handler(commands=['status'], commands_prefix='!')
async def cmd_status(message: types.Message):
await message.reply(f'Active sticker sets: {bot[ACTIVE]}\n'
f'Active uploads: {bot[UPLOADING]}\n'
f'Completed: {bot[COMPLETED]}\n'
f'Failed: {bot[FAILED]}')
def collect_counters():
return bot[ACTIVE], bot[UPLOADING], bot[COMPLETED], bot[FAILED]
async def monitoring(timeout):
active = collect_counters()
while loop.is_running():
current_active = collect_counters()
if current_active != active:
active = current_active
log.info(f'Active sticker sets: {bot[ACTIVE]}. '
f'Active uploads: {bot[UPLOADING]}. '
f'Completed: {bot[COMPLETED]}. '
f'Failed: {bot[FAILED]}.')
await asyncio.sleep(timeout)
if __name__ == '__main__':
loop.create_task(monitoring(TIMEOUT))
executor.start_polling(dp, skip_updates=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment