Skip to content

Instantly share code, notes, and snippets.

@Karasiq
Created December 22, 2023 13:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Karasiq/02503b8bf3aa4c4ba79c89ecde7bbceb to your computer and use it in GitHub Desktop.
Save Karasiq/02503b8bf3aa4c4ba79c89ecde7bbceb to your computer and use it in GitHub Desktop.
Export telegram group participants (message authors)
import asyncio
import logging
import os
import pathlib
import re
import shutil
from datetime import datetime, timedelta
from telethon import TelegramClient
from telethon.tl.functions.messages import GetDialogsRequest
from telethon.tl.types import InputPeerEmpty, Channel, User, Message, PeerUser
SESSION_FILE = 'save_channel.session'
api_id = 1239260
api_hash = 'ba579e042b9b2bdecb1edb6f20e4b15e'
_client_ref: TelegramClient = None
async def get_client() -> TelegramClient:
global _client_ref
if _client_ref is None:
_client_ref = TelegramClient(SESSION_FILE, api_id, api_hash, device_model="Oppo Find 5",
system_version="SDK 29", lang_code='en', system_lang_code='en_US',
app_version="1.1.2", auto_reconnect=True, flood_sleep_threshold=30,
receive_updates=False)
await _client_ref.connect()
await _client_ref.start()
# await _client_ref.sign_in(input('Input TG phone number: '), password=input('Input 2FA password: '))
return _client_ref
async def select_channel() -> Channel:
client = await get_client()
chats = []
groups = []
result = await client(GetDialogsRequest(
offset_date=None,
offset_id=0,
offset_peer=InputPeerEmpty(),
limit=200,
hash=0
))
chats.extend(result.chats)
async for chat in client.iter_dialogs():
if chat.is_channel:
groups.append(chat)
groups = sorted(groups, key=lambda x: x.title)
re = '' # "\033[1;31m"
gr = '' # "\033[1;32m"
cy = '' # "\033[1;36m"
print(gr + '[+] Groups:' + re)
i = 0
for g in groups:
print(gr + '[' + cy + str(i) + gr + ']' + cy + ' - ' + g.title)
i += 1
g_indexes = input(gr + "[+] Enter index: " + re)
return groups[int(g_indexes)]
async def save_channel():
client = await get_client()
channel = await select_channel()
safe_title = re.sub(r'[.,\\/{}\r\n\t|+<>"\'?:]', '', channel.title)
output_dir = f'./save_channel/{safe_title}'
# if pathlib.Path(output_dir).is_dir():
# shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
logging.info(f'Saving channel {channel.title} to {output_dir}')
users = set()
with open(f'{output_dir}/participants.txt', 'w+') as output_file:
async for message in client.iter_messages(channel, offset_date=datetime.now() - timedelta(days=14)):
message1: Message = message
sender: PeerUser = message1.from_id
try:
sender: User = await client.get_entity(sender)
except:
continue
# is_admin = (await client.get_permissions(channel, sender)).is_admin
# if is_admin:
# continue
if sender.username is not None and sender.username not in users:
try:
users.add(sender.username)
output_file.writelines([f'@{sender.username}\n'])
output_file.flush()
except:
pass
if __name__ == "__main__":
logging.basicConfig(
# filename='logs.log',
level=logging.INFO,
format='%(asctime)s : %(filename)s line - %(lineno)d : %(funcName)s : %(name)s : %(levelname)s : %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(save_channel())
loop.run_until_complete(task)
# threading.Thread(target=lambda: loop.run_until_complete(task), daemon=False).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment