Skip to content

Instantly share code, notes, and snippets.

@kaecy
Created February 9, 2024 17:52
Show Gist options
  • Save kaecy/92d864e2dbde3402630a49186a39c2f9 to your computer and use it in GitHub Desktop.
Save kaecy/92d864e2dbde3402630a49186a39c2f9 to your computer and use it in GitHub Desktop.
User Search module for automated storing, updating and searching users' groups and working bot code
# Date: Feb 9, 2024
import os
import tomllib
import json
import usersearch
import poster
from telethon import TelegramClient, events, sync, types, functions
from telethon.utils import get_display_name
env = tomllib.load(open("env.toml", "rb"))
app_id = env['app_id']
app_hash = env['app_hash']
bot_token = env['bot_token']
enableSearch = True
bot = TelegramClient('bot', app_id, app_hash).start(bot_token=bot_token)
def create_mention(user):
return f"[{get_display_name(user)}](tg://user?id={user.id})"
async def welcome_msg(event):
if event.user_joined or event.user_added and not (await event.get_user()).bot:
user = await event.get_user()
await event.respond("Hello, " + create_mention(user))
@bot.on(events.NewMessage)
async def ev(event):
if "/insert" in event.text:
usersearch.add_entry_for_approval(event.text.replace("/insert ", ""))
await event.respond("Added for approval.")
if "/tinsert" in event.text:
link = event.raw_text.split(" ", 1)[1]
try:
entity = await bot.get_entity(link)
except ValueError:
await event.reply("Channel or group not found")
return
if entity.broadcast:
full = await bot(functions.channels.GetFullChannelRequest(entity))
full_channel = full.full_chat
participants = await bot.get_participants(full_channel.linked_chat_id)
else:
participants = await bot.get_participants(entity)
result = usersearch.insert_entry(participants, entity, link)
if result[0] == 0:
response = "Title: " + entity.title + "\n" + \
"Type: " + ("Channel" if entity.broadcast else "Group") + "\n" + \
"Users: " + str(result[1]) + "\nSaved."
await event.reply(response)
else:
await event.reply("Updated. Added " + str(result[1]) + " users.")
if enableSearch and "/search" in event.text:
userEntity = event.text.split(" ", 1)[1]
if userEntity.startswith("@"):
userEntity = (await bot.get_entity(userEntity))
elif userEntity.startswith("https://t.me/"):
userEntity = (await bot.get_entity("@" + userEntity[13:]))
else:
if userEntity.isnumeric():
userEntity = int(userEntity)
elif not userEntity.startswith("@"):
userEntity = (await bot.get_entity("@" + userEntity))
result = usersearch.search(userEntity)
if result[0] == 0:
text = "Human found!\n\nTelegram ID: " + str(userEntity.id) + "\nName: "
#entity = await bot.get_entity(userEntity)
await event.reply(text + get_display_name(userEntity) + result[2], link_preview=False)
else:
await event.reply("User not found.")
if event.text.startswith("/omdb:"):
posterData = poster.get(event.text.replace("/omdb:", ""))
if posterData != None:
await event.respond(posterData[0], file=posterData[1])
@bot.on(events.ChatAction)
async def ev(event):
await welcome_msg(event)
bot.start()
print ("Bot has started...")
bot.run_until_disconnected()
# Get this from my.telegram.org/auth
app_id =
app_hash =
# Get this from @botfather by creating a bot
bot_token =
# User Search module
import os
import json
from telethon import types
db = {
"groups": {},
"channels": {}
}
try:
channels = json.load(open("channels.json"))
except FileNotFoundError:
channels = {}
try:
groups = json.load(open("groups.json"))
except FileNotFoundError:
groups = {}
try:
approvals = json.load(open("approvals.json"))
except FileNotFoundError:
approvals = []
try:
for file in os.listdir("groups"):
db["groups"][file.split(".")[0]] = json.load(open("groups\\" + file))
except FileNotFoundError:
os.mkdir("groups")
try:
for file in os.listdir("channels"):
db["channels"][file.split(".")[0]] = json.load(open("channels\\" + file))
except FileNotFoundError:
os.mkdir("channels")
def add_entry_for_approval(entityString):
approvals.append(entityString)
open("approvals.json", "w").write(json.dumps(approvals, indent=2))
def insert_entry(participants, entity, entityString):
if type(entity) == types.Channel:
if entity.broadcast:
uniqueEntityMap = channels
entityType = "channels"
else:
uniqueEntityMap = groups
entityType = "groups"
users = []
for user in participants:
users.append({
"id": user.id,
"first_name": user.first_name,
"last_name": user.last_name
})
if str(entity.id) not in uniqueEntityMap:
db[entityType][entity.id] = users
uniqueEntityMap[entity.id] = entityString
open(entityType + "\\" + str(entity.id) + ".json", "w").write(json.dumps(users, indent=2))
open(entityType + ".json", "w").write(json.dumps(uniqueEntityMap, indent=2))
return (0, len(users))
else:
# already in list
newUsers = []
for user in users:
if user not in db[entityType][str(entity.id)]:
newUsers.append(user)
db[entityType][str(entity.id)].extend(newUsers)
open(entityType + "\\" + str(entity.id) + ".json", "w").write(json.dumps(db[entityType][str(entity.id)], indent=2))
return (1, len(newUsers))
def search(userEntity):
userobj = None
groupids, channelids = [], []
for groupid, users in db["groups"].items():
for u in users:
if userEntity.id == u['id']:
if userobj == None:
userobj = u
groupids.append(groupid)
break
for groupid, users in db["channels"].items():
for u in users:
if userEntity.id == u['id']:
if userobj == None:
userobj = u
channelids.append(groupid)
break
ogroups = ""
ochannels = ""
for i in groupids:
ogroups += groups[i] + "\n"
for i in channelids:
ochannels += channels[i] + "\n"
if groupids or channelids:
result = ""
if ochannels:
result += "\n\nChannels:\n" + ochannels
if ogroups:
result += "\n\nGroups:\n" + ogroups
return (0, userobj, result)
else:
return (1,)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment