Skip to content

Instantly share code, notes, and snippets.

@stek29
Last active July 13, 2022 05:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stek29/26d3573752720015b0f0b183d3212afe to your computer and use it in GitHub Desktop.
Save stek29/26d3573752720015b0f0b183d3212afe to your computer and use it in GitHub Desktop.
t.me/stickerclonerbot -- Can somebody host this?
import requests
import io
from PIL import Image
import re
from queue import Queue
import threading
class BotException(Exception):
pass
class Bot:
API_URL = 'https://api.telegram.org/'
def __init__(self, token, debug=False):
self.DEBUG = debug
self.token = token
self.base_url = self.API_URL + 'bot' + token + '/'
self.file_url = self.API_URL + 'file/bot' + token + '/'
self.me = self.execute('getMe')
def execute_raw(self, method, params=None, files=None):
if files is None:
r = requests.get(self.base_url + method, params=params)
else:
r = requests.post(self.base_url + method, params=params,
files=files)
if self.DEBUG:
print(r.url)
return r.json()
def execute(self, method, params=None, files=None):
result = self.execute_raw(method, params, files)
if result['ok'] != True:
raise BotException('Err: '+str(result))
return result['result']
def get_file_url_by_id(self, file_id):
file_info = self.execute('getFile', {'file_id': file_id})
return self.get_file_url_by_path(file_info['file_path'])
def get_file_url_by_path(self, path):
return self.file_url + path
class ClonerThread(threading.Thread):
def __init__(self, bot, queue):
super().__init__()
self.bot = bot
self.queue = queue
def webp_to_png(self, as_user, file_id):
bot = self.bot
# get webp
url = bot.get_file_url_by_id(file_id)
r = requests.get(url, stream=True)
r.decode_content = True
# convert to png
img = Image.open(r.raw)
f = io.BytesIO()
img.save(f, format='png')
# upload png
f.seek(0, 0)
uploaded_file = bot.execute('uploadStickerFile',
{'user_id': as_user},
{'png_sticker': f}
)
return uploaded_file['file_id']
def clone_sticker_set(self, name, new_name, as_user):
bot = self.bot
if bot.DEBUG:
print('clone_sticker_set(', name, as_user)
def gen_file(file_id):
return self.webp_to_png(as_user, file_id)
sset = bot.execute('getStickerSet', {'name': name})
new_set = bot.execute('createNewStickerSet', {
'user_id': as_user,
'name': new_name,
'title': sset['title'],
'png_sticker': gen_file(sset['stickers'][0]['file_id']),
'emojis': sset['stickers'][0]['emoji']
})
assert new_set == True
bot.log('Created: %s as %s'%(new_name, as_user))
del sset['stickers'][0]
for sticker in sset['stickers']:
bot.execute('addStickerToSet', {
'user_id': as_user,
'name': new_name,
'png_sticker': gen_file(sticker['file_id']),
'emojis': sticker['emoji']
})
def run(self):
while True:
# name, new_name, as_user, chat_id, reply_to
job = self.queue.get()
reply_to = job[4]
reply_to = self.bot.execute('sendMessage', {
'chat_id': job[3],
'text': 'Started!',
'reply_to_message_id': reply_to
})['message_id']
try:
self.clone_sticker_set(job[0], job[1], job[2])
msg = 'Done!'
except BotException as e:
msg = str(e)
except Exception as e:
print(e)
msg = 'Error'
self.bot.execute('sendMessage', {
'chat_id': job[3],
'text': msg,
'reply_to_message_id': reply_to
})
self.queue.task_done()
class ClonerBot(Bot):
def __init__(self, token, nthreads=3, timeout=15, debug=False, log_chat_id=None):
super().__init__(token=token, debug=debug)
self.timeout = timeout
self.log_chat_id = log_chat_id
self.queue = Queue()
for _ in range(nthreads):
th = ClonerThread(self, self.queue)
th.setDaemon(True)
th.start()
def log(self, msg):
print('LOG:', msg)
if self.log_chat_id is not None:
try:
self.execute('sendMessage', {
'chat_id': self.log_chat_id,
'text': msg
})
except BotException as e:
print('LOG FAILED:', e)
def clone_sticker_set_safe(self, name, as_user, reply_to, chat_id):
if self.DEBUG:
print('clone_sticker_set_safe(', name, as_user, reply_to,
chat_id)
if name.lower().endswith('_by_'+self.me['username'].lower()):
self.execute('sendMessage', {
'text': 'Can\'t clone the clone!',
'chat_id': chat_id,
'reply_to_message_id': reply_to
})
return
new_name = '{}_by_{}'.format(name, self.me['username'])
try:
sset = self.execute('getStickerSet', {'name': new_name})
# should raise by now
self.execute('sendMessage', {
'text': 'Was cloned already to '\
't.me/addstickers/%s'%new_name,
'chat_id': chat_id,
'reply_to_message_id': reply_to
})
return
except BotException:
pass
reply_to = self.execute('sendMessage', {
'chat_id': chat_id,
'text': 'Cloning to t.me/addstickers/%s.\n'\
'%d in queue.'%(new_name, self.queue.qsize()+1),
'reply_to_message_id': reply_to
})['message_id']
self.queue.put((name, new_name, as_user, chat_id, reply_to))
def start_clone_polling(self):
offset = 0
while True:
updates = self.execute('getUpdates', {
'offset': offset,
'timeout': self.timeout,
'allowed_updates': ['message']
})
for update in updates:
offset = max(update['update_id'] + 1, offset)
message = update.get('message')
if message is None:
continue
self.process_message(message)
def process_message(self, message):
from_user = message.get('from', {}).get('id')
from_chat = message['chat']['id']
mid = message['message_id']
text = message.get('text', '').strip()
if text.startswith('/start') or text.startswith('/help'):
self.execute('sendMessage', {
'text': 'Send me a sticker or addstickers' \
' link to clone pack\n'\
'See also: /report',
'chat_id': from_chat,
'reply_to_message_id': mid
})
return
if text.startswith('/report'):
match = re.match(r'/report\s+([a-zA-Z\d_]+).*', text)
if not match:
msg = '\n'.join((
'Usage: `/report pack_by_%s`'%self.me['username'],
'Report pack to admin',
'Use if pack is outdated or something '\
'else is wrong with it'
))
else:
sname = match.group(1)
if sname.lower().endswith(self.me['username'].lower()):
self.log('Report: chat:%s user:%s id:%s pack:%s'%(
from_chat, from_user, mid, match.group(1)
))
msg = 'Reported to admin!'
else:
msg = 'Cant report pack \'%s\'!'
self.execute('sendMessage', {
'text': msg,
'chat_id': from_chat,
'reply_to_message_id': mid
})
return
sname = ''
match = re.search(r'/addstickers/([a-zA-Z\d_]+)', text)
if match:
sname = match.group(1)
sticker = message.get('sticker')
if sticker is not None:
set_name = sticker.get('set_name')
if set_name is not None:
sname = set_name
else:
self.execute('sendMessage', {
'text': 'Sticker doesn\'t belong to pack',
'chat_id': from_chat,
'reply_to_message_id': mid
})
if sname:
self.clone_sticker_set_safe(sname, from_user, mid,
from_chat)
@Atharv3142M
Copy link

How to host this
Pls tell

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment