Skip to content

Instantly share code, notes, and snippets.

@RedL0tus
Last active Dec 30, 2018
Embed
What would you like to do?
Telegram Spammer as a Service
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
# Telegram Spammer as a Service - Test how fast your anti-spam bot is
# Copyright (C) 2018 Kay Lin <kaymw@member.fsf.org>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
__author__ = 'Kay Lin <kaymw@member.fsf.org>'
import os
import json
import time
import logging
from pyrogram import Client, Filters, Error
app = Client('homo')
SPAM = '''\
{user} 请求测试本群的 CAPTCHA 系统,下面是 spam 正文:
同性交友,排解寂寞
https://github.com/
此帐号是一个用于测试 CAPTCHA bot 封禁速度的 user bot,详细信息请私聊。
'''
HELP = '''\
这是一个用以测试群组防 Spam 措施(主要是 CAPTCHA 类)反应速度的 userbot。
使用方法:
<code>'spam [群组链接(如果是私有群)或群组用户名(公开群组)]{1,5}</code>
注:[]{1,5} 为正则语法,表示至少一个最多五个
请不要滥用,请仅在管理员同意之后使用本 bot,也请各位管理员不要封禁此帐号,(不然我会很头疼...)。
本 bot 是开源程序,完整源码可在这里( https://gist.github.com/RedL0tus/bbb5e8d12860a1252f363ddfddad6f10 )找到,使用 GPLv3+ 协议发布,如要重新使用请遵守许可条款。
如遇到问题请咨询 @TheSaltedFish。
'''
UNKNOWN_ERROR = '''\
蜜汁错误
'''
RATE_LIMIT_NOTICE = '''\
您已被流量限制,请一分钟后再试
'''
TOO_FEW_ARGUMENTS = '''\
至少需要一个用户名或链接
'''
TOO_MANY_ARGUMENTS = '''\
一次最多只能 spam 五个群
'''
LICENSE_NOTICE = '''\
Spammer as a Service version 1, Copyright (C) 2018 Kay Lin
Licensed under the terms of the GNU General Public License v3 or later (GPLv3+)
'''
RATE_LIMIT = dict()
HISTORY = []
START_TIME = int(time.time())
class FailedToJoinChat(Exception):
pass
class FailedToSpam(Exception):
pass
class NotAGroup(Exception):
pass
class FailedToQuit(Exception):
pass
def spam(client, chat_id, requester):
logger = logging.getLogger('Spamming Procedure')
try:
logger.info('>>> Trying to join %s' % chat_id)
client.join_chat(chat_id)
except Error as e:
raise FailedToJoinChat(e)
except IndexError as e:
raise NotAGroup('Not a valid group')
for dialog in client.get_dialogs().dialogs:
try:
if dialog.chat.type == 'group' or dialog.chat.type == 'supergroup':
client.send_message(
dialog.chat.id,
SPAM.format(user=requester)
)
except Error as e:
client.leave_chat(dialog.chat.id, delete=True)
raise FailedToSpam(e)
try:
client.leave_chat(dialog.chat.id, delete=True)
except Error as e:
raise FailedToQuit(e)
def spam_wrapper(client, message):
logger = logging.getLogger('Spam Controller')
if message.from_user.id in RATE_LIMIT.keys() and\
RATE_LIMIT[message.from_user.id] > (time.time() - 60):
client.send_message(
message.chat.id,
RATE_LIMIT_NOTICE,
reply_to_message_id=message.message_id
)
return
success = []
failed = dict()
if message.from_user.username:
requester = '@' + message.from_user.username
else:
requester = 'ID: ' + message.from_user.id
chat_ids = message.text.split(' ')[1:]
if len(chat_ids) > 5:
logger.error('>>> Too many arguments')
client.send_message(
message.chat.id, TOO_MANY_ARGUMENTS,
reply_to_message_id=message.message_id
)
return
elif not chat_ids:
logger.error('>>> Too few arguments')
client.send_message(
message.chat.id, TOO_FEW_ARGUMENTS,
reply_to_message_id=message.message_id
)
return
logger.info('>>> %s requested to spam %s' % (message.from_user.id, chat_ids))
for chat_id in chat_ids:
try:
spam(client, chat_id, requester)
except (FailedToJoinChat, NotAGroup) as e:
failed[chat_id] = e
logger.error('>>> Failed to join %s: %s' % (chat_id, e))
continue
except FailedToSpam as e:
failed[chat_id] = e
logger.error('>>> Failed to spam %s: %s' % (chat_id, e))
continue
except FailedToQuit as e:
logger.error('>>> Failed to quit %s: %s' % (chat_id, e))
pass
logger.info('>>> %s succeed.' % chat_id)
success.append(chat_id)
response = ''
if success:
response += '>>> 成功: %s\n' % ', '.join(success)
for failure in failed.keys():
response += '\n>>> %s 失败: %s\n' % (failure, failed[failure])
if response:
client.send_message(
message.chat.id, response,
reply_to_message_id=message.message_id
)
else:
client.send_message(
message.chat.id, UNKNOWN_ERROR,
reply_to_message_id=message.message_id
)
RATE_LIMIT[message.from_user.id] = int(time.time())
event = dict()
event['time'] = time.time()
event['requester_id'] = message.from_user.id
event['requester_name'] = requester
event['message'] = message.text
event['success'] = success
event['failed'] = [ key for key in failed.keys() ]
event['response'] = response
HISTORY.append(event)
with open('saas-log-%s.log' % START_TIME, 'w') as log_file:
json.dump(HISTORY, log_file)
@app.on_message(Filters.text & Filters.private)
def on_message(client, message):
logger = logging.getLogger('Message Listener')
if message.from_user.is_self:
return
if message.text.startswith('\'spam '):
spam_wrapper(client, message)
else:
logger.info('>>> Respond with help message')
client.send_message(
message.chat.id, HELP,
reply_to_message_id=message.message_id,
parse_mode='HTML'
)
if __name__ == '__main__':
print(LICENSE_NOTICE)
logging.basicConfig(level=logging.INFO)
logging.getLogger("pyrogram").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
if os.path.exists('saas-log-%s.log' % START_TIME):
logger.error('>>> Log file already exists.')
exit(1)
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment