Skip to content

Instantly share code, notes, and snippets.

@kraftwerk28
Created May 19, 2022 20:45
Show Gist options
  • Save kraftwerk28/290185b0248e818007ac55f8be01a02b to your computer and use it in GitHub Desktop.
Save kraftwerk28/290185b0248e818007ac55f8be01a02b to your computer and use it in GitHub Desktop.
import emoji
import gzip
from io import BytesIO
import json
import logging
from pyrogram import filters, types
from pyrogram.raw.functions.messages import GetStickerSet
from pyrogram.raw.functions.upload import GetFile
from pyrogram.raw.types import (
InputStickerSetAnimatedEmoji, Document,
StickerSet, InputDocumentFileLocation,
)
log = logging.getLogger(__name__)
@app.on_message(
filters.reply &
filters.regex(
r"^\.amp(?:l(?:i(?:fy?)?)?)?"
r"(?:\s+(-?\d+))?"
r"(?:\s+(reply))?"
r"?(?:\s+(-?\d+))?$",
)
)
async def on_amplify_sticker(client: Client, message: types.Message):
# if message.from_user is not None and message.from_user.is_self:
# await client.remember_my_reply(message)
log.info("Amplify command")
re_match, = message.matches
reply = message.reply_to_message
try:
amplify_amount = int(re_match[1])
except Exception:
amplify_amount = 8
log.info(f"Amount: {amplify_amount}")
reply_to_msg_id = message.id
if "reply" in (re_match[1], re_match[2]):
reply_to_msg_id = reply.id
if message.from_user.is_self:
await message.delete()
text = reply.text if reply.text is not None else reply.caption
if (reply.sticker and reply.sticker.is_animated) or reply.document:
log.info("Amplifying sticker...")
sticker_io = await client.download_media(reply, in_memory=True)
sticker_filename = sticker_io.name
sticker_io.seek(0)
tgs_json = json.loads(gzip.decompress(sticker_io.read()))
elif text is not None:
msg_emojis = emoji.emoji_lis(text)
log.info(
"Emoji in the message: "
f"{','.join(e['emoji'] for e in msg_emojis)}"
)
nth_emoji = 1
if re_match[3] is not None:
nth_emoji = int(re_match[3])
try:
target_emoji = msg_emojis[nth_emoji - 1]["emoji"]
except IndexError:
return
log.info(f"Amplifying emoji #{nth_emoji} ({target_emoji})")
t = InputStickerSetAnimatedEmoji()
req = GetStickerSet(stickerset=t, hash=0)
stickerset: StickerSet = await app.invoke(req)
pack = next(
(p for p in stickerset.packs if p.emoticon == target_emoji),
None
)
if pack is None:
log.info(f"Pack for this {target_emoji} not found.")
return
sticker_doc: Document = next(
(d for d in stickerset.documents if d.id == pack.documents[0]),
None
)
if sticker_doc is None:
return
input_doc = InputDocumentFileLocation(
id=sticker_doc.id,
access_hash=sticker_doc.access_hash,
file_reference=sticker_doc.file_reference,
thumb_size="",
)
req = GetFile(location=input_doc, offset=0, limit=0x100000)
msession = await app.media_session_for_dc(sticker_doc.dc_id)
sticker_file = await msession.invoke(req)
tgs_json = json.loads(gzip.decompress(sticker_file.bytes))
sticker_filename = "AnimatedSticker.tgs"
else:
return
def json_rec(o):
if type(o) is dict:
for k, v in o.items():
if type(v) in (float, int):
if k == "x":
o[k] = v
elif k == "y":
o[k] = v * amplify_amount
else:
json_rec(v)
elif type(o) is list:
for v in o:
json_rec(v)
json_rec(tgs_json)
tgs_result = gzip.compress(json.dumps(tgs_json).encode("utf-8"))
result_io = BytesIO(tgs_result)
result_io.name = sticker_filename
sent = await client.send_sticker(
message.chat.id,
result_io,
reply_to_message_id=reply_to_msg_id,
)
if sent.document.mime_type == "application/x-bad-tgsticker":
await sent.delete()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment