Created
November 14, 2022 01:33
-
-
Save flipdazed/f6df9fc6339ab48e0bb57f7ef301a5b1 to your computer and use it in GitHub Desktop.
Grab all contacts from Telegram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Expects a config file containing the app data from https://my.telegram.org/apps | |
to be saved at ``'~/.telegram/conf.yaml'`` | |
```yaml | |
cgrab: | |
id: <yourId> | |
hash: <yourHash> | |
servers: | |
test: | |
ip: some_ip | |
pubkey: | | |
-----BEGIN RSA PUBLIC KEY----- | |
<STUFF> | |
-----END RSA PUBLIC KEY----- | |
prod: | |
ip: some_ip | |
pubkey: | | |
-----BEGIN RSA PUBLIC KEY----- | |
<STUFF> | |
-----END RSA PUBLIC KEY----- | |
``` | |
""" | |
import os | |
import yaml | |
import telethon | |
from telethon import TelegramClient | |
from tqdm import tqdm | |
from typing import Dict | |
CONF = os.path.join(os.path.expanduser('~'), '.telegram', 'conf.yaml') | |
with open(CONF) as f: | |
config = yaml.safe_load(f) | |
# Use your own values from my.telegram.org | |
api_id = config['cgrab']['id'] | |
api_hash = config['cgrab']['hash'] | |
def split_company_by(name, by=['//', '|']): | |
company = '' | |
for b in by: | |
split_name = name.split(b) | |
if len(split_name) > 1: | |
company = ''.join([company, split_name[-1]]) | |
name = ''.join(split_name[:-1]) | |
return company.strip(), name.strip() | |
def get_user_details(user: telethon.tl.types.User) -> Dict[str, str]: | |
first_name = ('' if user.first_name is None else user.first_name) | |
last_name = ('' if user.last_name is None else user.last_name) | |
company = '' | |
_company, first_name = split_company_by(first_name) | |
company += _company | |
_company, last_name = split_company_by(last_name) | |
company += _company | |
return { | |
'first_name': first_name, | |
'last_name': last_name, | |
'telegram': user.username, | |
'phone': user.phone, | |
'mutual_contact': user.mutual_contact, | |
'company': company | |
} | |
# start the connection | |
client = TelegramClient('session_name', api_id, api_hash) | |
await client.start() | |
await client.connect() | |
# The first parameter is the .session file name (absolute paths allowed) | |
contacts = [] | |
total = len(list(client.iter_dialogs())) | |
for i, dialog in tqdm(enumerate(client.iter_dialogs()), total=total): | |
tqdm.write(dialog.date.strftime('%d-%b-%Y') + ' : ' + dialog.title) | |
# if the person is a user and you have added them in contacts | |
if dialog.is_user and dialog.entity.contact: | |
for msg_cnt, message in enumerate(client.iter_messages(dialog)): | |
# ensure at least one message | |
if msg_cnt > 1: | |
user = dialog.entity | |
contacts.append(get_user_details(user)) | |
break | |
# if a group then it needs to have under 20 members | |
elif dialog.is_group: | |
group_participant_limit = 20 | |
_contacts = [] | |
for i, participant in enumerate(client.iter_participants(dialog)): | |
_contacts.append(get_user_details(participant)) | |
if i > group_participant_limit: | |
tqdm.write(len(dialog.date.strftime('%d-%b-%Y') + ' : ' + dialog.title)*' '+ ' > exceeded reasonable size!') | |
_contacts = [] | |
break | |
contacts.extend(_contacts) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment