Last active
July 13, 2022 05:00
-
-
Save stek29/26d3573752720015b0f0b183d3212afe to your computer and use it in GitHub Desktop.
t.me/stickerclonerbot -- Can somebody host this?
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import io | |
from PIL import Image | |
import re | |
from queue import Queue | |
import threading | |
class BotException(Exception): | |
pass | |
class Bot: | |
API_URL = 'https://api.telegram.org/' | |
def __init__(self, token, debug=False): | |
self.DEBUG = debug | |
self.token = token | |
self.base_url = self.API_URL + 'bot' + token + '/' | |
self.file_url = self.API_URL + 'file/bot' + token + '/' | |
self.me = self.execute('getMe') | |
def execute_raw(self, method, params=None, files=None): | |
if files is None: | |
r = requests.get(self.base_url + method, params=params) | |
else: | |
r = requests.post(self.base_url + method, params=params, | |
files=files) | |
if self.DEBUG: | |
print(r.url) | |
return r.json() | |
def execute(self, method, params=None, files=None): | |
result = self.execute_raw(method, params, files) | |
if result['ok'] != True: | |
raise BotException('Err: '+str(result)) | |
return result['result'] | |
def get_file_url_by_id(self, file_id): | |
file_info = self.execute('getFile', {'file_id': file_id}) | |
return self.get_file_url_by_path(file_info['file_path']) | |
def get_file_url_by_path(self, path): | |
return self.file_url + path | |
class ClonerThread(threading.Thread): | |
def __init__(self, bot, queue): | |
super().__init__() | |
self.bot = bot | |
self.queue = queue | |
def webp_to_png(self, as_user, file_id): | |
bot = self.bot | |
# get webp | |
url = bot.get_file_url_by_id(file_id) | |
r = requests.get(url, stream=True) | |
r.decode_content = True | |
# convert to png | |
img = Image.open(r.raw) | |
f = io.BytesIO() | |
img.save(f, format='png') | |
# upload png | |
f.seek(0, 0) | |
uploaded_file = bot.execute('uploadStickerFile', | |
{'user_id': as_user}, | |
{'png_sticker': f} | |
) | |
return uploaded_file['file_id'] | |
def clone_sticker_set(self, name, new_name, as_user): | |
bot = self.bot | |
if bot.DEBUG: | |
print('clone_sticker_set(', name, as_user) | |
def gen_file(file_id): | |
return self.webp_to_png(as_user, file_id) | |
sset = bot.execute('getStickerSet', {'name': name}) | |
new_set = bot.execute('createNewStickerSet', { | |
'user_id': as_user, | |
'name': new_name, | |
'title': sset['title'], | |
'png_sticker': gen_file(sset['stickers'][0]['file_id']), | |
'emojis': sset['stickers'][0]['emoji'] | |
}) | |
assert new_set == True | |
bot.log('Created: %s as %s'%(new_name, as_user)) | |
del sset['stickers'][0] | |
for sticker in sset['stickers']: | |
bot.execute('addStickerToSet', { | |
'user_id': as_user, | |
'name': new_name, | |
'png_sticker': gen_file(sticker['file_id']), | |
'emojis': sticker['emoji'] | |
}) | |
def run(self): | |
while True: | |
# name, new_name, as_user, chat_id, reply_to | |
job = self.queue.get() | |
reply_to = job[4] | |
reply_to = self.bot.execute('sendMessage', { | |
'chat_id': job[3], | |
'text': 'Started!', | |
'reply_to_message_id': reply_to | |
})['message_id'] | |
try: | |
self.clone_sticker_set(job[0], job[1], job[2]) | |
msg = 'Done!' | |
except BotException as e: | |
msg = str(e) | |
except Exception as e: | |
print(e) | |
msg = 'Error' | |
self.bot.execute('sendMessage', { | |
'chat_id': job[3], | |
'text': msg, | |
'reply_to_message_id': reply_to | |
}) | |
self.queue.task_done() | |
class ClonerBot(Bot): | |
def __init__(self, token, nthreads=3, timeout=15, debug=False, log_chat_id=None): | |
super().__init__(token=token, debug=debug) | |
self.timeout = timeout | |
self.log_chat_id = log_chat_id | |
self.queue = Queue() | |
for _ in range(nthreads): | |
th = ClonerThread(self, self.queue) | |
th.setDaemon(True) | |
th.start() | |
def log(self, msg): | |
print('LOG:', msg) | |
if self.log_chat_id is not None: | |
try: | |
self.execute('sendMessage', { | |
'chat_id': self.log_chat_id, | |
'text': msg | |
}) | |
except BotException as e: | |
print('LOG FAILED:', e) | |
def clone_sticker_set_safe(self, name, as_user, reply_to, chat_id): | |
if self.DEBUG: | |
print('clone_sticker_set_safe(', name, as_user, reply_to, | |
chat_id) | |
if name.lower().endswith('_by_'+self.me['username'].lower()): | |
self.execute('sendMessage', { | |
'text': 'Can\'t clone the clone!', | |
'chat_id': chat_id, | |
'reply_to_message_id': reply_to | |
}) | |
return | |
new_name = '{}_by_{}'.format(name, self.me['username']) | |
try: | |
sset = self.execute('getStickerSet', {'name': new_name}) | |
# should raise by now | |
self.execute('sendMessage', { | |
'text': 'Was cloned already to '\ | |
't.me/addstickers/%s'%new_name, | |
'chat_id': chat_id, | |
'reply_to_message_id': reply_to | |
}) | |
return | |
except BotException: | |
pass | |
reply_to = self.execute('sendMessage', { | |
'chat_id': chat_id, | |
'text': 'Cloning to t.me/addstickers/%s.\n'\ | |
'%d in queue.'%(new_name, self.queue.qsize()+1), | |
'reply_to_message_id': reply_to | |
})['message_id'] | |
self.queue.put((name, new_name, as_user, chat_id, reply_to)) | |
def start_clone_polling(self): | |
offset = 0 | |
while True: | |
updates = self.execute('getUpdates', { | |
'offset': offset, | |
'timeout': self.timeout, | |
'allowed_updates': ['message'] | |
}) | |
for update in updates: | |
offset = max(update['update_id'] + 1, offset) | |
message = update.get('message') | |
if message is None: | |
continue | |
self.process_message(message) | |
def process_message(self, message): | |
from_user = message.get('from', {}).get('id') | |
from_chat = message['chat']['id'] | |
mid = message['message_id'] | |
text = message.get('text', '').strip() | |
if text.startswith('/start') or text.startswith('/help'): | |
self.execute('sendMessage', { | |
'text': 'Send me a sticker or addstickers' \ | |
' link to clone pack\n'\ | |
'See also: /report', | |
'chat_id': from_chat, | |
'reply_to_message_id': mid | |
}) | |
return | |
if text.startswith('/report'): | |
match = re.match(r'/report\s+([a-zA-Z\d_]+).*', text) | |
if not match: | |
msg = '\n'.join(( | |
'Usage: `/report pack_by_%s`'%self.me['username'], | |
'Report pack to admin', | |
'Use if pack is outdated or something '\ | |
'else is wrong with it' | |
)) | |
else: | |
sname = match.group(1) | |
if sname.lower().endswith(self.me['username'].lower()): | |
self.log('Report: chat:%s user:%s id:%s pack:%s'%( | |
from_chat, from_user, mid, match.group(1) | |
)) | |
msg = 'Reported to admin!' | |
else: | |
msg = 'Cant report pack \'%s\'!' | |
self.execute('sendMessage', { | |
'text': msg, | |
'chat_id': from_chat, | |
'reply_to_message_id': mid | |
}) | |
return | |
sname = '' | |
match = re.search(r'/addstickers/([a-zA-Z\d_]+)', text) | |
if match: | |
sname = match.group(1) | |
sticker = message.get('sticker') | |
if sticker is not None: | |
set_name = sticker.get('set_name') | |
if set_name is not None: | |
sname = set_name | |
else: | |
self.execute('sendMessage', { | |
'text': 'Sticker doesn\'t belong to pack', | |
'chat_id': from_chat, | |
'reply_to_message_id': mid | |
}) | |
if sname: | |
self.clone_sticker_set_safe(sname, from_user, mid, | |
from_chat) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
How to host this
Pls tell