Created
September 23, 2023 06:58
-
-
Save kaecy/62a19841ebfe82e955da3d1508ed7694 to your computer and use it in GitHub Desktop.
TG User Scanner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Date: Sep 3, 2023 | |
import os | |
import tomllib | |
import json | |
from telethon import TelegramClient, events, sync, types, functions | |
from telethon.utils import get_display_name | |
env = tomllib.load(open("env.toml", "rb")) | |
app_id = env['api_id'] | |
app_hash = env['api_hash'] | |
bot_token = env['bot_token'] | |
bot = TelegramClient('bot', app_id, app_hash).start(bot_token=bot_token) | |
channels = json.load(open("channels.json")) | |
groups = json.load(open("groups.json")) | |
enableSearch = True | |
db = { | |
"groups": {}, | |
"channels": {} | |
} | |
if enableSearch: | |
for file in os.listdir("groups"): | |
db["groups"][file.split(".")[0]] = json.load(open("groups\\" + file)) | |
for file in os.listdir("channels"): | |
db["channels"][file.split(".")[0]] = json.load(open("channels\\" + file)) | |
def create_mention(user): | |
return f"[{get_display_name(user)}](tg://user?id={user.id})" | |
async def insert_entry(event): | |
link = event.raw_text.split(" ", 1)[1] | |
try: | |
entity = await bot.get_entity(link) | |
except ValueError: | |
await event.reply("Channel or group not found") | |
return | |
usercount = 0 | |
if type(entity) == types.Channel: | |
if entity.broadcast: | |
if entity.id not in channels: | |
channels[entity.id] = link | |
open("channels.json", "w").write(json.dumps(channels, indent=2)) | |
full = await bot(functions.channels.GetFullChannelRequest(entity)) | |
full_channel = full.full_chat | |
#await event.reply(str(full_channel)) | |
participants = await bot.get_participants(full_channel.linked_chat_id) | |
usercount = participants.total | |
users = [] | |
for user in participants: | |
users.append({ | |
"id": user.id, | |
"first_name": user.first_name, | |
"last_name": user.last_name | |
}) | |
open("channels\\" + str(entity.id) + ".json", "w").write(json.dumps(users, indent=2)) | |
db['channels'][entity.id] = users | |
channels[entity.id] = link | |
else: | |
await event.reply("Already in list.") | |
return | |
else: | |
if entity.id not in groups: | |
groups[entity.id] = link | |
open("groups.json", "w").write(json.dumps(groups, indent=2)) | |
participants = await bot.get_participants(entity) | |
usercount = participants.total | |
users = [] | |
for user in participants: | |
users.append({ | |
"id": user.id, | |
"first_name": user.first_name, | |
"last_name": user.last_name | |
}) | |
open("groups\\" + str(entity.id) + ".json", "w").write(json.dumps(users, indent=2)) | |
db['groups'][entity.id] = users | |
groups[entity.id] = link | |
else: | |
await event.reply("Already in list.") | |
return | |
o = "Title: " + entity.title + "\n" + \ | |
"Type: " + ("Channel" if entity.broadcast else "Group") + "\n" + \ | |
"Users: " + str(usercount) + "\nSaved." | |
#await event.reply(str(entity)) | |
await event.reply(o) | |
async def search_user(event): | |
user = event.raw_text.split(" ", 1)[1] | |
userobj = None | |
groupids, channelids = [], [] | |
if user.startswith("@"): | |
user = (await bot.get_entity(user)).id | |
elif user.startswith("https://t.me/"): | |
user = (await bot.get_entity("@" + user[13:])).id | |
else: | |
if user.isnumeric(): | |
user = int(user) | |
elif not user.startswith("@"): | |
user = (await bot.get_entity("@" + user)).id | |
for groupid, users in db["groups"].items(): | |
for u in users: | |
if user == u['id']: | |
if userobj == None: | |
userobj = u | |
groupids.append(groupid) | |
break | |
for groupid, users in db["channels"].items(): | |
for u in users: | |
if user == u['id']: | |
if userobj == None: | |
userobj = u | |
channelids.append(groupid) | |
break | |
ogroups = "" | |
ochannels = "" | |
for i in groupids: | |
ogroups += groups[i] + "\n" | |
for i in channelids: | |
ochannels += channels[i] + "\n" | |
if groupids or channelids: | |
results = "" | |
if ochannels: | |
results += "\n\nChannels:\n" + ochannels | |
if ogroups: | |
results += "\n\nGroups:\n" + ogroups | |
await event.reply("Human found!\n\nTelegram ID: " + str(user) + "\nName: " + get_display_name(await bot.get_entity(user)) + results, link_preview=False) | |
else: | |
await event.reply("Not found") | |
async def welcome_msg(event): | |
if event.user_joined or event.user_added and not (await event.get_user()).bot: | |
user = await event.get_user() | |
await event.respond("Hello, " + create_mention(user)) | |
@bot.on(events.NewMessage) | |
async def ev(event): | |
if "/i" in event.text: | |
await insert_entry(event) | |
if enableSearch and "/s" in event.text: | |
await search_user(event) | |
@bot.on(events.ChatAction) | |
async def ev(event): | |
await welcome_msg(event) | |
bot.start() | |
print ("Bot has started...") | |
bot.run_until_disconnected() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment