Skip to content

Instantly share code, notes, and snippets.

@woshishabii
Last active May 2, 2023 16:21
Show Gist options
  • Save woshishabii/518b890714d43cd4be3a104ab74dc3ab to your computer and use it in GitHub Desktop.
Save woshishabii/518b890714d43cd4be3a104ab74dc3ab to your computer and use it in GitHub Desktop.
src for MTF Bot
#!/usr/bin/python3.10
import telepot
import time
import urllib3
import sqlalchemy.orm
import datetime
import pytz
import pprint
print = pprint.pprint
VERSION = '0.0.1'
# Get token from file
with open('./tg_token', 'r') as f:
BOT_TOKEN = f.read().strip()
# Init ORM
engine = sqlalchemy.engine.create_engine('sqlite:///./tg_bot.db?check_same_thread=False', echo=False)
Base = sqlalchemy.orm.declarative_base()
class Report(Base):
__tablename__ = 'report'
ID = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
twitter_id = sqlalchemy.Column(sqlalchemy.String(100))
description = sqlalchemy.Column(sqlalchemy.String(1000))
reviewed = sqlalchemy.Column(sqlalchemy.Boolean)
time = sqlalchemy.Column(sqlalchemy.DateTime(timezone=True),
default=datetime.datetime.now(tz=pytz.timezone("Asia/Shanghai")))
Base.metadata.create_all(engine, checkfirst=True)
session_maker = sqlalchemy.orm.sessionmaker(bind=engine)
session = session_maker()
# You can leave this bit out if you're using a paid PythonAnywhere account
proxy_url = "http://proxy.server:3128"
telepot.api._pools = {
'default': urllib3.ProxyManager(proxy_url=proxy_url, num_pools=3, maxsize=10, retries=False, timeout=30),
}
telepot.api._onetime_pool_spec = (
urllib3.ProxyManager, dict(proxy_url=proxy_url, num_pools=1, maxsize=1, retries=False, timeout=30))
# end of the stuff that's only needed for free accounts
bot = telepot.Bot(BOT_TOKEN)
print(bot.getMe())
def help(command=''):
"""
Show Help
:param command: help for specified command
"""
if command:
return "TODO: Not implemented"
else:
res = f"--===MTF Bot🏳️‍⚧️===--\nVersion:{VERSION}\nCommands:\n"
res = res + '\n'.join([f'{_1}: {COMMANDS[_1].__doc__.strip().splitlines()[0]}' for _1 in COMMANDS])
return res
def submit(twitter_id, description):
"""
Submit a Report
:param twitter_id: Twitter ID of the abuser
:param description: Description
"""
new_report = Report(twitter_id=twitter_id, description=description, reviewed=False)
session.add(new_report)
session.commit()
return f"===========================\n" \
f"Submit Successful:\n" \
f"Twitter: https://twitter.com/{twitter_id}\n" \
f"Description: {description}\n" \
f"==========================="
def query(twitter_id):
"""
Query by twitter ID
:param twitter_id: Twitter ID
"""
res = []
for row in session.query(Report).filter(Report.twitter_id.like(f'%{twitter_id}%')).filter(Report.reviewed == True).all():
res.append([row.twitter_id, row.description])
res_s = '--===Result===--'
res_s = res_s + '\n\n'.join([f'Twitter: https://twitter.com/{_[0]}\nDescription:{_[1]}' for _ in res])
return res_s
# Register your commands here after declaring them above
# Note that:
# The arguments are passed in in str format
# The commands should return a string which is not empty
# The commands should have a docstring
COMMANDS = {
'submit': submit,
'help': help,
'query': query
}
# functions to parse the command
def parse_command(*args):
if args[0] in COMMANDS:
# Check argument length
command_max_arg = COMMANDS[args[0]].__code__.co_argcount
command_min_arg = command_max_arg - len(COMMANDS[args[0]].__defaults__ if COMMANDS[args[0]].__defaults__ else [])
if len(args[1:]) >= command_min_arg and len(args[1:]) <= command_max_arg:
return COMMANDS[args[0]](*args[1:])
else:
return f"Invalid Argument Length(required [{command_min_arg}, {command_max_arg}]), pls check it"
else:
return 'Invalid Command, pls check it'
# Handle messages came in
def handle(msg):
# parse the 'msg' using telepot.glance
content_type, chat_type, chat_id = telepot.glance(msg)
print(msg)
if content_type == 'text':
if msg["text"].startswith('/'):
# ( maybe we should implement a function to parse it ? )
# Modern Operating System really helps
parsed_command = msg['text'][1:].split(' ')
print(parsed_command)
res = parse_command(*parsed_command)
bot.sendMessage(chat_id, res)
else:
bot.sendMessage(chat_id, "It seems that you missed the '/'")
bot.message_loop(handle)
print('Listening ...')
# Keep the program running.
while 1:
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment