|
import asyncio |
|
import collections |
|
import sys |
|
import traceback |
|
from typing import Iterable, Deque, MutableMapping, AsyncGenerator, Optional |
|
|
|
import pyrogram |
|
import pyrogram.errors |
|
from pyrogram import Client, filters |
|
|
|
patterns = [ |
|
'have sex', |
|
'date4ubot', |
|
'find a girl' |
|
'get a girlfriend', |
|
'wanna to', |
|
'get a girl' |
|
] |
|
|
|
ban_permissions = pyrogram.types.ChatPermissions( |
|
can_send_messages=False, |
|
can_send_media_messages=False, |
|
can_send_stickers=False, |
|
can_send_animations=False, |
|
can_send_games=False, |
|
can_invite_users=False, |
|
can_pin_messages=False, |
|
can_send_polls=False, |
|
can_change_info=False, |
|
can_use_inline_bots=False, |
|
can_add_web_page_previews=False |
|
) |
|
|
|
from config import * |
|
|
|
app = Client('userbot', api_id, api_hash) |
|
|
|
history: Deque[pyrogram.types.Message] = collections.deque([], 20) |
|
banned_users: Deque[pyrogram.types.User] = collections.deque([], 20) |
|
reported_users: Deque[pyrogram.types.User] = collections.deque([], 20) |
|
|
|
full_user_cache: MutableMapping[int, pyrogram.raw.types.UserFull] = {} |
|
|
|
|
|
async def get_full_users(client: pyrogram.Client, user_ids: Iterable[int]) \ |
|
-> AsyncGenerator[pyrogram.raw.types.UserFull, None]: |
|
user_ids = list(user_ids) |
|
input_users = list(await asyncio.gather(*[client.resolve_peer(i) for i in user_ids])) |
|
|
|
for user_id, input_user in zip(list(user_ids), list(input_users)): |
|
if user_id in full_user_cache: |
|
yield full_user_cache[user_id] |
|
continue |
|
|
|
while True: |
|
try: |
|
full_user = await client.send(pyrogram.raw.functions.users.GetFullUser(id=input_user)) |
|
break |
|
except pyrogram.errors.exceptions.flood_420.FloodWait as exc: |
|
print(f"GetFullUser ratelimited for {exc.x} seconds, waiting") |
|
await asyncio.sleep(exc.x + 1) |
|
|
|
full_user_cache[user_id] = full_user |
|
yield full_user |
|
|
|
|
|
async def delete_message_report_later(client: pyrogram.Client, message_id): |
|
await asyncio.sleep(10) |
|
await client.delete_messages(chat_id, message_id) |
|
|
|
|
|
def fullname(user: pyrogram.types.User) -> str: |
|
return " ".join(filter(lambda i: bool(i), (user.first_name, user.last_name))) or "[no name]" |
|
|
|
|
|
async def report_ban(client: pyrogram.Client, user: pyrogram.types.User, bio: str): |
|
if user in reported_users: |
|
return |
|
|
|
await client.send_message(log_chat_id, |
|
f"User banned: @{user.username} - {fullname(user)} - ID: {user.id}\nBio: <i>{bio}</i>") |
|
to_delete = await client.send_message(chat_id, f"Spam bot nuked: @{user.username}") |
|
reported_users.append(user) |
|
asyncio.create_task(delete_message_report_later(client, to_delete.message_id)) |
|
|
|
|
|
async def ban(client: pyrogram.Client, user: pyrogram.types.User, message: Optional[pyrogram.types.Message] = None): |
|
print(f"Banning @{user.username} - {fullname(user)}") |
|
|
|
if user not in banned_users: |
|
banned_users.append(user) |
|
|
|
await client.restrict_chat_member(chat_id, user.id, ban_permissions) |
|
await client.delete_user_history(chat_id, user.id) |
|
svc_message = await client.kick_chat_member(chat_id, user.id) |
|
to_delete = [] |
|
if message: |
|
to_delete.append(message.message_id) |
|
if type(svc_message) == pyrogram.types.Message: |
|
to_delete.append(svc_message.message_id) |
|
if to_delete: |
|
await client.delete_messages(chat_id, message_ids=to_delete, revoke=True) |
|
|
|
|
|
async def cleanup(client: pyrogram.Client): |
|
to_delete = [] |
|
banned_usernames = list(filter(lambda u: bool(u), map(lambda u: u.username, banned_users))) |
|
banned_user_ids = list(map(lambda u: u.id, banned_users)) |
|
|
|
for message in history: |
|
if (message.text or message.caption) and message.from_user and message.from_user.id in bot_user_ids: |
|
for username in banned_usernames: |
|
if username.lower() in (message.text or message.caption).lower(): |
|
to_delete.append(message) |
|
|
|
if message.service and (message.new_chat_members or message.left_chat_member): |
|
new_left_users = (message.new_chat_members and list(message.new_chat_members) or []) + ( |
|
message.left_chat_member and list(message.left_chat_member) or []) |
|
new_left_ids = map(lambda u: u.id, new_left_users) |
|
for user_id in new_left_ids: |
|
if user_id in banned_user_ids: |
|
to_delete.append(message) |
|
break |
|
|
|
for message in to_delete: |
|
# noinspection PyBroadException |
|
try: |
|
print(f"Deleting message from @{message.from_user.username}") |
|
except Exception: |
|
pass |
|
if message in history: |
|
history.remove(message) |
|
|
|
await client.delete_messages(chat_id, list(map(lambda msg: msg.message_id, to_delete))) |
|
|
|
|
|
async def report_error(client: pyrogram.Client, text): |
|
print(text, file=sys.stderr) |
|
await client.send_message(log_chat_id, text) |
|
|
|
|
|
# noinspection PyBroadException |
|
async def on_new_members(client: pyrogram.Client, message: pyrogram.types.Message): |
|
try: |
|
user_ids = map(lambda u: u.id, message.new_chat_members) |
|
# workaround to catch all exceptions here |
|
full_users = [] |
|
async for i in get_full_users(client, user_ids): |
|
full_users.append(i) |
|
except Exception: |
|
await report_error(client, f"Unable to retrieve user bios\n\n```\n{traceback.format_exc()}\n```") |
|
traceback.print_exc() |
|
return |
|
|
|
for user in full_users: |
|
if not user.about: |
|
continue |
|
bio = user.about.lower() |
|
for pattern in patterns: |
|
if pattern in bio: |
|
try: |
|
await ban(client, user.user, message) |
|
await report_ban(client, user.user, user.about) |
|
|
|
# Shieldy likes to undo the ban. Ban again after some time |
|
async def ban_later(): |
|
await asyncio.sleep(120) |
|
await ban(client, user.user, None) |
|
|
|
asyncio.create_task(ban_later()) |
|
except Exception: |
|
await report_error(client, |
|
f"Unable to ban user @{user.user.username}, {fullname(user.user)}, " |
|
f"ID: {user.user.id}\nBio: <i>{user.about}</i>\n\n" |
|
f"```\n{traceback.format_exc()}\n```") |
|
break |
|
|
|
|
|
@app.on_message(filters=filters.chat(chat_id)) |
|
async def on_message(client: pyrogram.Client, message): |
|
if message.new_chat_members: |
|
await on_new_members(client, message) |
|
else: |
|
history.append(message) |
|
|
|
await cleanup(client) |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run() |