Skip to content

Instantly share code, notes, and snippets.

@RedL0tus RedL0tus/saas.py
Last active Dec 30, 2018

Embed
What would you like to do?
Telegram Spammer as a Service
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Telegram Spammer as a Service - Test how fast your anti-spam bot is
# Copyright (C) 2018 Kay Lin <kaymw@member.fsf.org>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
__author__ = 'Kay Lin <kaymw@member.fsf.org>'
import os
import json
import time
import logging
from pyrogram import Client, Filters, Error
app = Client('homo')
SPAM = '''\
{user} 请求测试本群的 CAPTCHA 系统,下面是 spam 正文:
同性交友,排解寂寞
https://github.com/
此帐号是一个用于测试 CAPTCHA bot 封禁速度的 user bot,详细信息请私聊。
'''
HELP = '''\
这是一个用以测试群组防 Spam 措施(主要是 CAPTCHA 类)反应速度的 userbot。
使用方法:
<code>'spam [群组链接(如果是私有群)或群组用户名(公开群组)]{1,5}</code>
注:[]{1,5} 为正则语法,表示至少一个最多五个
请不要滥用,请仅在管理员同意之后使用本 bot,也请各位管理员不要封禁此帐号,(不然我会很头疼...)。
本 bot 是开源程序,完整源码可在这里( https://gist.github.com/RedL0tus/bbb5e8d12860a1252f363ddfddad6f10 )找到,使用 GPLv3+ 协议发布,如要重新使用请遵守许可条款。
如遇到问题请咨询 @TheSaltedFish。
'''
UNKNOWN_ERROR = '''\
蜜汁错误
'''
RATE_LIMIT_NOTICE = '''\
您已被流量限制,请一分钟后再试
'''
TOO_FEW_ARGUMENTS = '''\
至少需要一个用户名或链接
'''
TOO_MANY_ARGUMENTS = '''\
一次最多只能 spam 五个群
'''
LICENSE_NOTICE = '''\
Spammer as a Service version 1, Copyright (C) 2018 Kay Lin
Licensed under the terms of the GNU General Public License v3 or later (GPLv3+)
'''
RATE_LIMIT = dict()
HISTORY = []
START_TIME = int(time.time())
class FailedToJoinChat(Exception):
pass
class FailedToSpam(Exception):
pass
class NotAGroup(Exception):
pass
class FailedToQuit(Exception):
pass
def spam(client, chat_id, requester):
logger = logging.getLogger('Spamming Procedure')
try:
logger.info('>>> Trying to join %s' % chat_id)
client.join_chat(chat_id)
except Error as e:
raise FailedToJoinChat(e)
except IndexError as e:
raise NotAGroup('Not a valid group')
for dialog in client.get_dialogs().dialogs:
try:
if dialog.chat.type == 'group' or dialog.chat.type == 'supergroup':
client.send_message(
dialog.chat.id,
SPAM.format(user=requester)
)
except Error as e:
client.leave_chat(dialog.chat.id, delete=True)
raise FailedToSpam(e)
try:
client.leave_chat(dialog.chat.id, delete=True)
except Error as e:
raise FailedToQuit(e)
def spam_wrapper(client, message):
logger = logging.getLogger('Spam Controller')
if message.from_user.id in RATE_LIMIT.keys() and\
RATE_LIMIT[message.from_user.id] > (time.time() - 60):
client.send_message(
message.chat.id,
RATE_LIMIT_NOTICE,
reply_to_message_id=message.message_id
)
return
success = []
failed = dict()
if message.from_user.username:
requester = '@' + message.from_user.username
else:
requester = 'ID: ' + message.from_user.id
chat_ids = message.text.split(' ')[1:]
if len(chat_ids) > 5:
logger.error('>>> Too many arguments')
client.send_message(
message.chat.id, TOO_MANY_ARGUMENTS,
reply_to_message_id=message.message_id
)
return
elif not chat_ids:
logger.error('>>> Too few arguments')
client.send_message(
message.chat.id, TOO_FEW_ARGUMENTS,
reply_to_message_id=message.message_id
)
return
logger.info('>>> %s requested to spam %s' % (message.from_user.id, chat_ids))
for chat_id in chat_ids:
try:
spam(client, chat_id, requester)
except (FailedToJoinChat, NotAGroup) as e:
failed[chat_id] = e
logger.error('>>> Failed to join %s: %s' % (chat_id, e))
continue
except FailedToSpam as e:
failed[chat_id] = e
logger.error('>>> Failed to spam %s: %s' % (chat_id, e))
continue
except FailedToQuit as e:
logger.error('>>> Failed to quit %s: %s' % (chat_id, e))
pass
logger.info('>>> %s succeed.' % chat_id)
success.append(chat_id)
response = ''
if success:
response += '>>> 成功: %s\n' % ', '.join(success)
for failure in failed.keys():
response += '\n>>> %s 失败: %s\n' % (failure, failed[failure])
if response:
client.send_message(
message.chat.id, response,
reply_to_message_id=message.message_id
)
else:
client.send_message(
message.chat.id, UNKNOWN_ERROR,
reply_to_message_id=message.message_id
)
RATE_LIMIT[message.from_user.id] = int(time.time())
event = dict()
event['time'] = time.time()
event['requester_id'] = message.from_user.id
event['requester_name'] = requester
event['message'] = message.text
event['success'] = success
event['failed'] = [ key for key in failed.keys() ]
event['response'] = response
HISTORY.append(event)
with open('saas-log-%s.log' % START_TIME, 'w') as log_file:
json.dump(HISTORY, log_file)
@app.on_message(Filters.text & Filters.private)
def on_message(client, message):
logger = logging.getLogger('Message Listener')
if message.from_user.is_self:
return
if message.text.startswith('\'spam '):
spam_wrapper(client, message)
else:
logger.info('>>> Respond with help message')
client.send_message(
message.chat.id, HELP,
reply_to_message_id=message.message_id,
parse_mode='HTML'
)
if __name__ == '__main__':
print(LICENSE_NOTICE)
logging.basicConfig(level=logging.INFO)
logging.getLogger("pyrogram").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
if os.path.exists('saas-log-%s.log' % START_TIME):
logger.error('>>> Log file already exists.')
exit(1)
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.