Created
February 9, 2024 17:52
-
-
Save kaecy/92d864e2dbde3402630a49186a39c2f9 to your computer and use it in GitHub Desktop.
User Search module for automated storing, updating and searching users' groups and working bot code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Date: Feb 9, 2024 | |
import os | |
import tomllib | |
import json | |
import usersearch | |
import poster | |
from telethon import TelegramClient, events, sync, types, functions | |
from telethon.utils import get_display_name | |
env = tomllib.load(open("env.toml", "rb")) | |
app_id = env['app_id'] | |
app_hash = env['app_hash'] | |
bot_token = env['bot_token'] | |
enableSearch = True | |
bot = TelegramClient('bot', app_id, app_hash).start(bot_token=bot_token) | |
def create_mention(user): | |
return f"[{get_display_name(user)}](tg://user?id={user.id})" | |
async def welcome_msg(event): | |
if event.user_joined or event.user_added and not (await event.get_user()).bot: | |
user = await event.get_user() | |
await event.respond("Hello, " + create_mention(user)) | |
@bot.on(events.NewMessage) | |
async def ev(event): | |
if "/insert" in event.text: | |
usersearch.add_entry_for_approval(event.text.replace("/insert ", "")) | |
await event.respond("Added for approval.") | |
if "/tinsert" in event.text: | |
link = event.raw_text.split(" ", 1)[1] | |
try: | |
entity = await bot.get_entity(link) | |
except ValueError: | |
await event.reply("Channel or group not found") | |
return | |
if entity.broadcast: | |
full = await bot(functions.channels.GetFullChannelRequest(entity)) | |
full_channel = full.full_chat | |
participants = await bot.get_participants(full_channel.linked_chat_id) | |
else: | |
participants = await bot.get_participants(entity) | |
result = usersearch.insert_entry(participants, entity, link) | |
if result[0] == 0: | |
response = "Title: " + entity.title + "\n" + \ | |
"Type: " + ("Channel" if entity.broadcast else "Group") + "\n" + \ | |
"Users: " + str(result[1]) + "\nSaved." | |
await event.reply(response) | |
else: | |
await event.reply("Updated. Added " + str(result[1]) + " users.") | |
if enableSearch and "/search" in event.text: | |
userEntity = event.text.split(" ", 1)[1] | |
if userEntity.startswith("@"): | |
userEntity = (await bot.get_entity(userEntity)) | |
elif userEntity.startswith("https://t.me/"): | |
userEntity = (await bot.get_entity("@" + userEntity[13:])) | |
else: | |
if userEntity.isnumeric(): | |
userEntity = int(userEntity) | |
elif not userEntity.startswith("@"): | |
userEntity = (await bot.get_entity("@" + userEntity)) | |
result = usersearch.search(userEntity) | |
if result[0] == 0: | |
text = "Human found!\n\nTelegram ID: " + str(userEntity.id) + "\nName: " | |
#entity = await bot.get_entity(userEntity) | |
await event.reply(text + get_display_name(userEntity) + result[2], link_preview=False) | |
else: | |
await event.reply("User not found.") | |
if event.text.startswith("/omdb:"): | |
posterData = poster.get(event.text.replace("/omdb:", "")) | |
if posterData != None: | |
await event.respond(posterData[0], file=posterData[1]) | |
@bot.on(events.ChatAction) | |
async def ev(event): | |
await welcome_msg(event) | |
bot.start() | |
print ("Bot has started...") | |
bot.run_until_disconnected() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Get this from my.telegram.org/auth | |
app_id = | |
app_hash = | |
# Get this from @botfather by creating a bot | |
bot_token = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# User Search module | |
import os | |
import json | |
from telethon import types | |
db = { | |
"groups": {}, | |
"channels": {} | |
} | |
try: | |
channels = json.load(open("channels.json")) | |
except FileNotFoundError: | |
channels = {} | |
try: | |
groups = json.load(open("groups.json")) | |
except FileNotFoundError: | |
groups = {} | |
try: | |
approvals = json.load(open("approvals.json")) | |
except FileNotFoundError: | |
approvals = [] | |
try: | |
for file in os.listdir("groups"): | |
db["groups"][file.split(".")[0]] = json.load(open("groups\\" + file)) | |
except FileNotFoundError: | |
os.mkdir("groups") | |
try: | |
for file in os.listdir("channels"): | |
db["channels"][file.split(".")[0]] = json.load(open("channels\\" + file)) | |
except FileNotFoundError: | |
os.mkdir("channels") | |
def add_entry_for_approval(entityString): | |
approvals.append(entityString) | |
open("approvals.json", "w").write(json.dumps(approvals, indent=2)) | |
def insert_entry(participants, entity, entityString): | |
if type(entity) == types.Channel: | |
if entity.broadcast: | |
uniqueEntityMap = channels | |
entityType = "channels" | |
else: | |
uniqueEntityMap = groups | |
entityType = "groups" | |
users = [] | |
for user in participants: | |
users.append({ | |
"id": user.id, | |
"first_name": user.first_name, | |
"last_name": user.last_name | |
}) | |
if str(entity.id) not in uniqueEntityMap: | |
db[entityType][entity.id] = users | |
uniqueEntityMap[entity.id] = entityString | |
open(entityType + "\\" + str(entity.id) + ".json", "w").write(json.dumps(users, indent=2)) | |
open(entityType + ".json", "w").write(json.dumps(uniqueEntityMap, indent=2)) | |
return (0, len(users)) | |
else: | |
# already in list | |
newUsers = [] | |
for user in users: | |
if user not in db[entityType][str(entity.id)]: | |
newUsers.append(user) | |
db[entityType][str(entity.id)].extend(newUsers) | |
open(entityType + "\\" + str(entity.id) + ".json", "w").write(json.dumps(db[entityType][str(entity.id)], indent=2)) | |
return (1, len(newUsers)) | |
def search(userEntity): | |
userobj = None | |
groupids, channelids = [], [] | |
for groupid, users in db["groups"].items(): | |
for u in users: | |
if userEntity.id == u['id']: | |
if userobj == None: | |
userobj = u | |
groupids.append(groupid) | |
break | |
for groupid, users in db["channels"].items(): | |
for u in users: | |
if userEntity.id == u['id']: | |
if userobj == None: | |
userobj = u | |
channelids.append(groupid) | |
break | |
ogroups = "" | |
ochannels = "" | |
for i in groupids: | |
ogroups += groups[i] + "\n" | |
for i in channelids: | |
ochannels += channels[i] + "\n" | |
if groupids or channelids: | |
result = "" | |
if ochannels: | |
result += "\n\nChannels:\n" + ochannels | |
if ogroups: | |
result += "\n\nGroups:\n" + ogroups | |
return (0, userobj, result) | |
else: | |
return (1,) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment