Skip to content

Instantly share code, notes, and snippets.

@alemidev
Last active March 6, 2022 19:03
Show Gist options
  • Save alemidev/4d06398cb018421dfb1bcfc828fd6d1b to your computer and use it in GitHub Desktop.
Save alemidev/4d06398cb018421dfb1bcfc828fd6d1b to your computer and use it in GitHub Desktop.
Some tools to debug bots and applications as a plugin for my telegram bot framework
import asyncio
import io
import logging
import sys
import inspect
from alemibot import alemiBot
from pyrogram import filters
from pyrogram.types import MessageEntity, ReplyKeyboardMarkup
from alemibot.util.help import CATEGORIES
from alemibot.util.command import _Message as Message
from alemibot.util import (
filterCommand, cleartermcolor, get_user, ProgressChatAction, is_me, edit_or_reply,
sudo, is_allowed, report_error, set_offline, cancel_chat_action, HelpCategory
)
logger = logging.getLogger(__name__)
class stdoutWrapper():
def __init__(self):
self.buffer = io.StringIO()
self.old_stdout = sys.stdout
self.old_stderr = sys.stderr
def __enter__(self):
sys.stdout = self.buffer
sys.stderr = self.buffer
return self.buffer
def __exit__(self, exc_type, exc_value, exc_traceback):
sys.stdout = self.old_stdout
sys.stderr = self.old_stderr
HELP = HelpCategory("DEBUGTOOL")
@HELP.add(cmd="[<path>]")
@alemiBot.on_message(sudo & filterCommand("put"))
@report_error(logger)
@set_offline
@cancel_chat_action
async def put_cmd(client:alemiBot, message:Message):
"""save file to server
Reply to a media message or attach media to store file on server.
File will be saved in `downloads` folder if no path is specified.
"""
msg = message
prog = ProgressChatAction(client, message.chat.id, "find_location")
dest_path = message.command[0] or "downloads/"
if message.reply_to_message is not None:
msg = message.reply_to_message
if msg.media:
fpath = await client.download_media(msg, file_name=dest_path, progress=prog.tick)
await edit_or_reply(message, f'` → ` saved file as `{fpath}`')
else:
await edit_or_reply(message, "`[!] → ` No file given")
@HELP.add(cmd="<path>")
@alemiBot.on_message(sudo & filterCommand("get", flags=["-log"]))
@report_error(logger)
@set_offline
@cancel_chat_action
async def get_cmd(client:alemiBot, message:Message):
"""request a file from server
Will upload a file from server to this chat.
The path can be absolute or relative (starting from alemiBot workdir).
Use flag `-log` to automatically upload `data/debug.log`.
"""
reply_to = message.reply_to_message
if reply_to and reply_to.reply_markup and isinstance(reply_to.reply_markup, ReplyKeyboardMarkup):
return await edit_or_reply(message, "`[!] → ` Not allowed from ReplyKeyboard")
if len(message.command) < 1 and not message.command["-log"]:
return await edit_or_reply(message, "`[!] → ` No input")
prog = ProgressChatAction(client, message.chat.id)
if message.command["-log"]: # ugly special case for debug.log
with open("data/debug.log") as f:
logfile = f.read()
if len(client.session_name) > 25: # It's most likely a session string
logfile = logfile.replace(client.session_name, "client") # botchy fix for those using a session string
log_io = io.BytesIO(logfile.encode("utf-8"))
log_io.name = "debug.log"
await client.send_document(message.chat.id, log_io, reply_to_message_id=message.message_id,
caption='` → ` **logfile**', progress=prog.tick)
else:
await client.send_document(message.chat.id, message.command[0], reply_to_message_id=message.message_id,
caption=f'` → ` **{message.command[0]}**', progress=prog.tick)
@HELP.add(cmd="<cmd>")
@alemiBot.on_message(sudo & filterCommand(["run", "r"], options={
"timeout" : ["-t", "-time"]
}))
@set_offline
@cancel_chat_action
async def run_cmd(client:alemiBot, message:Message):
"""run a command on server
Shell will be from user running bot: every command starts in bot root folder.
There is a timeout of 60 seconds to any command issued, this can be changed with the `-t` option.
You should properly wrap your arguments with `\"`, they will be ignored by cmd parser.
"""
reply_to = message.reply_to_message
if reply_to and reply_to.reply_markup and isinstance(reply_to.reply_markup, ReplyKeyboardMarkup):
return await edit_or_reply(message, "`[!] → ` Not allowed from ReplyKeyboard")
timeout = float(message.command["timeout"] or 60)
args = message.command.text
msg = await edit_or_reply(message, "` → ` Running")
try:
logger.info("Executing shell command \"%s\"", args)
proc = await asyncio.create_subprocess_shell(
args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT)
stdout, _stderr = await asyncio.wait_for(proc.communicate(), timeout)
result = cleartermcolor(stdout.decode()).rstrip()
if len(args) + len(result) > 4080:
await msg.edit(f"`$` `{args}`\n` → Output too long, sending as file`")
prog = ProgressChatAction(client, message.chat.id)
out = io.BytesIO((f"$ {args}\n" + result).encode('utf-8'))
out.name = "output.txt"
await client.send_document(message.chat.id, out, progress=prog.tick)
else:
output = f"$ {args}"
entities = [ MessageEntity(type="code", offset=0, length=len(output)) ]
if len(result) > 0:
entities.append(MessageEntity(type="pre", offset=len(output) + 2, length=len(result), language="bash"))
output += "\n\n" + result
await msg.edit(output, entities=entities)
except asyncio.exceptions.TimeoutError:
await msg.edit(f"`$` `{args}`\n`[!] → ` Timed out")
except Exception as e:
logger.exception("Error in .run command")
await msg.edit(f"`$ {args}`\n`[!] → ` " + str(e))
@HELP.add(cmd="<expr>")
@alemiBot.on_message(sudo & filterCommand(["eval", "e"]))
@set_offline
@cancel_chat_action
async def eval_cmd(client:alemiBot, message:Message):
"""eval a python expression
No imports can be made nor variables can be assigned :`eval` cannot have side effects.
Returned value will be printed upon successful evaluation. `stdout` won't be captured (use `.ex`).
If a coroutine is returned, it will be awaited. This won't tokenize large outputs per-line, use .ex if you need that.
"""
reply_to = message.reply_to_message
if reply_to and reply_to.reply_markup and isinstance(reply_to.reply_markup, ReplyKeyboardMarkup):
return await edit_or_reply(message, "`[!] → ` Not allowed from ReplyKeyboard")
args = message.command.text
msg = await edit_or_reply(message, "` → ` Evaluating")
try:
logger.info("Evaluating \"%s\"", args)
result = eval(args)
if inspect.iscoroutine(result):
result = await result
result = str(result).rstrip()
if len(args) + len(result) > 4080:
await msg.edit(f"```>>> {args}\n → Output too long, sending as file```")
prog = ProgressChatAction(client, message.chat.id)
out = io.BytesIO((f">>> {args}\n" + result).encode('utf-8'))
out.name = "output.txt"
await client.send_document(message.chat.id, out, parse_mode="markdown", progress=prog.tick)
else:
output = f">>> {args}"
entities = [ MessageEntity(type="code", offset=0, length=len(output)) ]
if len(result) > 0:
entities.append(MessageEntity(type="code", offset=len(output), length=len(result) + 1))
output += "\n" + result
await msg.edit(output, entities=entities)
except Exception as e:
logger.exception("Error in .eval command")
await msg.edit(f"`>>> {args}`\n`[!] {type(e).__name__} → ` {str(e)}", parse_mode='markdown')
async def aexec(code, client, message): # client and message are passed so they are in scope
exec(
f'async def __aex(): ' +
''.join(f'\n {l}' for l in code.split('\n')),
locals()
)
return await locals()['__aex']()
@HELP.add(cmd="<code>")
@alemiBot.on_message(sudo & filterCommand(["exec", "ex"]))
@set_offline
@cancel_chat_action
async def exec_cmd(client:alemiBot, message:Message):
"""execute python code
Will capture and print stdout.
This, unlike `eval`, has no bounds and **can have side effects**. Use with more caution than `eval`!
The `exec` call is wrapped to make it work with async code.
"""
reply_to = message.reply_to_message
if reply_to and reply_to.reply_markup and isinstance(reply_to.reply_markup, ReplyKeyboardMarkup):
return await edit_or_reply(message, "`[!] → ` Not allowed from ReplyKeyboard")
args = message.command.text
fancy_args = args.replace("\n", "\n... ")
msg = message if is_me(message) else await message.reply("`[PLACEHOLDER]`")
await msg.edit("```>>> " + fancy_args + "```\n` → ` Executing")
try:
logger.info("Executing python expr:\n\t%s", args.replace('\n', '\n\t'))
with stdoutWrapper() as fake_stdout:
await aexec(args, client, message)
result = fake_stdout.getvalue().rstrip()
if len(args) + len(result) > 4080:
await msg.edit(f"`>>>` `{fancy_args}`\n` → Output too long, sending as file`")
prog = ProgressChatAction(client, message.chat.id)
out = io.BytesIO((f">>> {fancy_args}\n" + result).encode('utf-8'))
out.name = "output.txt"
await client.send_document(message.chat.id, out, parse_mode='markdown', progress=prog.tick)
else:
output = f">>> {fancy_args}"
entities = [ MessageEntity(type="pre", offset=0, length=len(output), language="python") ]
if len(result) > 0:
entities.append(MessageEntity(type="pre", offset=len(output) + 2, length=len(result), language="python"))
output += "\n\n" + result
await msg.edit(output, entities=entities)
except Exception as e:
logger.exception("Error in .exec command")
await msg.edit(f"`>>> {args}`\n`[!] {type(e).__name__} → ` {str(e)}", parse_mode='markdown')
@HELP.add(cmd="[<target>]", sudo=False)
@alemiBot.on_message(is_allowed & filterCommand("where", flags=["-no"]))
@report_error(logger)
@set_offline
@cancel_chat_action
async def where_cmd(client:alemiBot, message:Message):
"""get info about a chat
Get complete information about a chat and send it as json.
If no chat name or id is specified, current chat will be used.
Add `-no` at the end if you just want the id : no file will be attached.
"""
tgt = message.chat
prog = ProgressChatAction(client, message.chat.id)
if len(message.command) > 0:
arg = message.command[0]
if arg.isnumeric():
tgt = await client.get_chat(int(arg))
else:
tgt = await client.get_chat(arg)
await edit_or_reply(message, f"` → ` Getting data of chat `{tgt.id}`")
if not message.command["-no"]:
out = io.BytesIO((str(tgt)).encode('utf-8'))
out.name = f"chat-{tgt.id}.json"
await client.send_document(message.chat.id, out, progress=prog.tick)
@HELP.add(cmd="[<target>]", sudo=False)
@alemiBot.on_message(is_allowed & filterCommand("who", flags=["-no"]))
@report_error(logger)
@set_offline
@cancel_chat_action
async def who_cmd(client:alemiBot, message:Message):
"""get info about a user
Get complete information about user and attach as json.
If replying to a message, author will be used.
An id or @ can be specified. If neither is applicable, self will be used.
Use `-no` flag if you just want the id.
"""
peer = get_user(message)
prog = ProgressChatAction(client, message.chat.id)
if len(message.command) > 0:
arg = message.command[0]
peer = await client.get_users(int(arg) if arg.isnumeric() else arg)
elif message.reply_to_message is not None:
peer = get_user(message.reply_to_message)
await edit_or_reply(message, f"` → ` Getting data of user `{peer.id}`")
if not message.command["-no"]:
out = io.BytesIO((str(peer)).encode('utf-8'))
out.name = f"user-{peer.id}.json"
await client.send_document(message.chat.id, out, progress=prog.tick)
@HELP.add(cmd="[<target>]", sudo=False)
@alemiBot.on_message(is_allowed & filterCommand("what", options={
"group" : ["-g", "-group"]
}, flags=["-no"]))
@report_error(logger)
@set_offline
@cancel_chat_action
async def what_cmd(client:alemiBot, message:Message):
"""get info about a message
Get complete information about a message and attach as json.
If replying, replied message will be used.
Id and chat can be passed as arguments. If no chat is specified with `-g`, message will be searched in current chat.
Append `-no` if you just want the id.
"""
msg = message
prog = ProgressChatAction(client, message.chat.id)
if message.reply_to_message is not None:
msg = await client.get_messages(message.chat.id, message.reply_to_message.message_id)
elif len(message.command) > 0 and message.command[0].isnumeric():
chat_id = message.chat.id
if "group" in message.command:
if message.command["group"].isnumeric():
chat_id = int(message.command["group"])
else:
chat_id = (await client.get_chat(message.command["group"])).id
msg = await client.get_messages(chat_id, int(message.command[0]))
await edit_or_reply(message, f"` → ` Getting data of msg `{msg.message_id}`")
if not message.command["-no"]:
out = io.BytesIO((str(msg)).encode('utf-8'))
out.name = f"msg-{msg.message_id}.json"
await client.send_document(message.chat.id, out, progress=prog.tick)
@HELP.add()
@alemiBot.on_message(sudo & filterCommand(["joined", "jd"]))
@report_error(logger)
@set_offline
@cancel_chat_action
async def joined_cmd(client:alemiBot, message:Message):
"""count active chats
Get number of dialogs: groups, supergroups, channels, dms, bots.
Will show them divided by category and a total.
"""
msg = await edit_or_reply(message, "` → ` Counting...")
res = {}
total = 0
with ProgressChatAction(client, message.chat.id, "choose_contact") as prog:
async for dialog in client.iter_dialogs():
if dialog.chat.type not in res:
res[dialog.chat.type] = 0
res[dialog.chat.type] += 1
total += 1
out = f"`→ ` **{total}** --Active chats-- \n"
for k in res:
out += f"` → ` **{k}** {res[k]}\n"
await msg.edit(out)
@HELP.add()
@alemiBot.on_message(sudo & filterCommand(['tasks', 'task']))
@report_error(logger)
@set_offline
async def running_tasks_cmd(client:alemiBot, message:Message):
"""show running callbacks
Will print running handler callbacks, with their hash.
To be able to use these functions, you need my (experimental!) pyrogram fork :
pip install https://github.com/alemidev/pyrogram/archive/task_management.zip"""
if not hasattr(client, "running"): # ugly check eww
return await edit_or_reply(message, "<code>[!] → </code> This pyrogram version lacks task management.", parse_mode="html")
line = "<b>[</b><code>{hash}</code><b>]</b> {name}\n"
out = ""
for h in client.running:
out += line.format(hash=h, name=client.running[h].__name__)
await edit_or_reply(message, out, parse_mode="html")
@HELP.add(cmd="<hash>")
@alemiBot.on_message(sudo & filterCommand(['stop', 'cancel']))
@report_error(logger)
@set_offline
async def cancel_task_cmd(client:alemiBot, message:Message):
"""cancel running callbacks
Will immediately stop a running callback.
To be able to use these functions, you need my (experimental!) pyrogram fork :
pip install https://github.com/alemidev/pyrogram/archive/task_management.zip"""
if not hasattr(client, "running"): # ugly check eww
return await edit_or_reply(message, "<code>[!] → </code> This pyrogram version lacks task management.", parse_mode="html")
if len(message.command) < 1:
return await edit_or_reply(message, "<code>[!] → </code> No task hash provided", parse_mode="html")
cb_id = int(message.command[0])
client.running.pop(cb_id).close()
await edit_or_reply(message, f"<code> → </code> Canceled task <code>{cb_id}</code>", parse_mode="html")
@alemiBot.on_message(sudo & filterCommand(["make_botfather_list"], flags=["-all"]))
@report_error(logger)
@set_offline
async def botfather_list_command(client:alemiBot, message:Message):
"""make botfather-compatible command list"""
out = ""
for k in CATEGORIES:
for kk in CATEGORIES[k].HELP_ENTRIES:
if not message.command["-all"] and not CATEGORIES[k].HELP_ENTRIES[kk].public:
continue
e = CATEGORIES[k].HELP_ENTRIES[kk]
out += f"{e.title} - {e.args} | {e.shorttext}\n"
await message.reply(out, parse_mode='markdown')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment