Created
March 14, 2021 00:10
-
-
Save pedromaironi/9f785cacc9cb8e011aed17754e6e85e5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
################################################# | |
# BOT TOKEN # | |
################################################# | |
import os | |
import telebot | |
from telebot import types | |
from colorclass import Color | |
import json | |
import time | |
import six | |
import sys | |
import traceback | |
import re | |
import socket | |
from collections import OrderedDict | |
from pymongo import MongoClient | |
import requests | |
from random import choice | |
################################################# | |
# USEFUL FUNCTIONS AND DATAS # | |
################################################# | |
with open('extra_data/extra.json', 'r') as f: | |
extra = json.load(f) | |
bot = telebot.TeleBot(extra['token']) | |
logBot = telebot.TeleBot(extra['token_logbot']) | |
admins = extra['admins'] | |
filtered = list() | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
MESSAGE = extra['udp_message'] | |
UDP_IP = extra['udp_ip'] | |
UDP_PORT = extra['udp_port'] | |
client = MongoClient('localhost:27017') | |
db = client.users | |
with open('responses.json', encoding='utf-8') as f: | |
responses = json.load(f, object_pairs_hook=OrderedDict) | |
def send_udp(txt): | |
try: | |
sock.sendto(MESSAGE.format(txt).encode(), (UDP_IP, UDP_PORT)) | |
except: | |
pass | |
userStep = dict() | |
def is_recent(m): | |
return (time.time() - m.date) < 60 | |
def is_banned(uid): | |
if is_user(uid): | |
return db.usuarios.find_one(str(uid))['banned'] | |
else: | |
return False | |
def next_step_handler(uid): | |
""" Función para controlar los steps dentro de las diferentes funciones """ | |
if uid not in userStep or is_banned(uid): | |
userStep[uid] = 0 | |
return userStep[uid] | |
def lang(uid): | |
if db.usuarios.find_one(str(uid)): | |
return db.usuarios.find_one(str(uid))['lang'] | |
else: | |
return 'en' | |
def is_beta(uid): | |
""" Función para comprobar si un ID es beta """ | |
return uid in extra['beta'] | |
def was_user(cid): | |
return db.usuarios.find_one(str(cid)) is not None and db.usuarios.find_one(str(cid))['active'] == False | |
def is_user(cid): | |
return db.usuarios.find_one(str(cid)) is not None and db.usuarios.find_one(str(cid))['active'] == True | |
def is_admin(cid): | |
""" Función para comprobar si un ID es admin """ | |
return int(cid) in admins | |
def remove_tag(text): | |
TAG_RE = re.compile(r'<[^>]+>') | |
return TAG_RE.sub('', text) | |
def isint(s): | |
if not s: | |
return False | |
if s[0] in ('-', '+'): | |
return s[1:].isdigit() | |
return s.isdigit() | |
def user_info(cid, language=None): | |
from datetime import datetime | |
yes = "✔️" | |
no = "✖️" | |
user = db.usuarios.find_one(str(cid)) | |
reg_date = datetime.fromtimestamp(int(user['register'])).strftime('%d/%m/%Y') | |
notifications = yes if user['notify'] else no | |
summoner_name = user['summoner'] if user['summoner'] else no | |
region = user['server'].upper() if user['server'] else no | |
return responses['user_info'][lang(cid) if not language else language].format(reg_date, notifications, summoner_name, region) if int(cid) > 0 else responses['group_info'][lang(cid) if not language else language].format(reg_date, notifications) | |
def escape_markup(text): | |
if not isinstance(text, str): | |
return text | |
characters = ['_', '*', '['] | |
for character in characters: | |
text = text.replace(character, '\\' + character) | |
return text | |
def contact_format(m): | |
name = m.from_user.first_name | |
alias = str(m.from_user.username) | |
cid = str(m.chat.id) | |
uid = str(m.from_user.id) | |
m_id = str(m.message_id) | |
msg = m.text | |
if cid == uid: | |
txt = "Nuevo mensaje\n\nNombre: " + name + "\nAlias: @" + alias + "\nIdioma: " + \ | |
lang(cid) + "\nM_ID: " + m_id + "\nID: " + cid + "\n\nMensaje: " + msg | |
else: | |
txt = "Nuevo mensaje\n\nNombre: " + name + "\nAlias: @" + alias + "\nIdioma: " + lang( | |
cid) + "\nM_ID: " + m_id + "\nID: " + cid + "\nUID: " + uid + "\n\nMensaje: " + msg | |
return txt | |
with open('extra_data/file_ids.json', 'r') as f: | |
file_ids = json.load(f) | |
with open('extra_data/sorcio.json', 'r') as f: | |
sorcio = json.load(f) | |
data = dict() | |
for x in [ | |
'es', | |
'en', | |
'de', | |
'it', | |
'fr', | |
'pl', | |
'pt', | |
'ru', | |
'el', | |
'th', | |
'tr', | |
'ro']: | |
with open('champs_%s.json' % x, 'r') as f: | |
data[x] = json.load(f) | |
with open('champs_en.json', 'r') as f: | |
data['fa'] = data['ar'] = json.load(f) | |
with open('champs_keys.json', 'r') as f: | |
data['keys'] = json.load(f) | |
def line(alt=False): | |
if alt: | |
return u'\n—————————————————————————\n' | |
else: | |
return u'\n`—————————————————————————`\n' | |
def send_exception(exception): | |
exc_type, exc_obj, exc_tb = sys.exc_info() | |
tb = traceback.extract_tb(exc_tb, 4) | |
message = '\n`' + str(exc_type) + '`' | |
message += '\n\n`' + str(exc_obj) + '`' | |
for row in tb: | |
message += line() | |
for val in row: | |
message += '`' + str(val) + '`\n' | |
return message | |
def to_json(m): | |
d = {} | |
for x, y in six.iteritems(m.__dict__): | |
if hasattr(y, '__dict__'): | |
d[x] = to_json(y) | |
else: | |
d[x] = y | |
return d | |
def static_versions(): | |
return extra['static_version'] | |
base_regions = {'euw': 'euw1', | |
'ru': 'ru', | |
'kr': 'kr', | |
'br': 'br1', | |
'oce': 'oc1', | |
'na': 'na1', | |
'eune': 'eun1', | |
'lan': 'la1', | |
'las': 'la2', | |
'tr': 'tr1'} | |
def update_region(reg): | |
return base_regions.get(reg) | |
def get_3_best_champs(summonerId, region, cid): | |
url = 'https://{}.api.riotgames.com/lol/champion-mastery/v4/champion-masteries/by-summoner/{}'.format( | |
update_region(region.lower()), summonerId) | |
params = { | |
"api_key": extra['lol_api'] | |
} | |
jstr = requests.get( | |
url=url, | |
params=params | |
) | |
if jstr.status_code != 200: | |
return None | |
else: | |
return OrderedDict([(data[lang(cid)][data['keys'][str(x['championId'])]['key']][ | |
'name'], str(x['championLevel'])) for x in jstr.json()[:3]]) | |
def get_summoner(name, region): | |
url = "https://{}.api.riotgames.com/lol/summoner/v4/summoners/by-name/{}".format(region, name) | |
params = { | |
'api_key': extra['lol_api'] | |
} | |
r = requests.get(url, params) | |
if r.status_code != 200: | |
raise Exception("Error finding {} in {}".format(name, region)) | |
return r.json() | |
def get_current_game(summoner_id, region): | |
url = "https://{}.api.riotgames.com/lol/spectator/v4/active-games/by-summoner/{}".format(region, summoner_id) | |
params = { | |
'api_key': extra['lol_api'] | |
} | |
r = requests.get(url=url, params=params) | |
if r.status_code != 200: | |
raise Exception("The summoner with ID {} is not in game.".format(summoner_id)) | |
return r.json() | |
def get_summoner_info(invocador, region, cid): | |
try: | |
summoner = get_summoner(name=invocador, region=update_region(region)) | |
except: | |
txt = responses['summoner_error'][ | |
lang(cid)] % (invocador, region.upper()) | |
return txt | |
lattest_version = static_versions() | |
icon_id = summoner['profileIconId'] | |
icon_url = "http://ddragon.leagueoflegends.com/cdn/{}/img/profileicon/{}.png".format( | |
lattest_version, icon_id) | |
summoner_name = summoner['name'] | |
summoner_id = summoner['id'] | |
opgg_region = region.lower() if region.lower() != 'kr' else 'www' | |
opgg = 'http://{}.op.gg/summoner/userName={}'.format(opgg_region, ''.join(summoner_name.split())) | |
lolking = "http://www.lolking.net/summoner/" + \ | |
region + "/" + str(summoner_id) | |
summoner_level = summoner['summonerLevel'] | |
txt = responses['summoner_30_beta_1'][lang(cid)].format(icon_url, summoner_name, opgg, summoner_level) | |
if summoner_level > 29: | |
url = "https://{}.api.riotgames.com/lol/league/v4/entries/by-summoner/{}".format(update_region(region), summoner_id) | |
params = {'api_key': extra['lol_api']} | |
r = requests.get(url, params) | |
if r.status_code != 200: | |
txt = "ERROR EN LÍNEA 89 DE me.py" | |
return txt | |
r_json = r.json() | |
if not r_json: | |
aux = {} | |
if len(r_json) == 1: | |
aux = {r_json[0]['queueType']:r_json[0]} | |
else: | |
aux = {x['queueType']:x for x in r_json} | |
for x in aux: | |
txt += responses['summoner_30_beta_2'][lang(cid)].format( | |
"SoloQ" if x == "RANKED_SOLO_5x5" else "FlexQ" if x == "RANKED_FLEX_SR" else x, | |
responses['tier'][lang(cid)][aux[x]['tier']], | |
aux[x]['rank'], | |
aux[x]['wins'], | |
aux[x]['losses'], | |
aux[x]['leaguePoints']) | |
try: | |
bst = get_3_best_champs(summoner['id'], region, cid) | |
if bst: | |
txt += '\n\n*' + responses['best_champs'][lang(cid)] + '*:' | |
for x, y in bst.items(): | |
txt += '\n- ' + x + ' _(Level: ' + y + ')_' | |
except Exception as e: | |
bot.send_message(52033876, send_exception(e), parse_mode="Markdown") | |
return txt | |
def restart_process(): | |
os.popen('kill -9 {}'.format(os.getpid())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment