telegram batch file downloader (with deduplication and file date preserving)
#!/usr/bin/env python3 | |
import hashlib | |
import os | |
import time | |
import traceback | |
from datetime import datetime | |
import argparse | |
import re | |
from telethon import TelegramClient, helpers, utils | |
def get_name(msg, path): | |
kind, possible_names = client._get_kind_and_names(msg.media.document.attributes) | |
possible_names = [f'{msg.id}_{x}' for x in possible_names] | |
directory, name = path, possible_names[0] | |
name, ext = os.path.splitext(name) | |
if not ext: | |
ext = utils.get_extension(msg.media) | |
result = os.path.join(directory, name + ext) | |
return result | |
# return client._get_proper_filename( | |
# path, kind, utils.get_extension(msg.media), | |
# date=msg.date, possible_names=possible_names | |
# ) | |
async def main(client, args): | |
await client.start() | |
chat = await client.get_entity(args.chat) | |
os.makedirs("files", exist_ok=True) | |
async for m in client.iter_messages(chat, reverse=True, offset_date=args.start_date): | |
if m.file is not None and (args.mime is None or (m.file.mime_type is not None and args.mime.match(m.file.mime_type))): | |
try: | |
mm = m.media | |
sn = f'{m.chat.id}/' | |
helpers.ensure_parent_dir_exists(sn) | |
sn = get_name(m, sn) | |
if os.path.exists(sn): | |
print(f'{m.date} https://t.me/c/{m.chat.id}/{m.id:<6} 100.00% {m.file.size / 1024 / 1024:7.2f} MiB / {m.file.size / 1024 / 1024:<7.2f}MiB {m.file.name} ALREADY EXISTS!') | |
continue | |
partfile = f".{m.chat.id}_{m.id}.partfile" | |
if os.path.lexists(partfile): | |
os.remove(partfile) | |
sha = hashlib.sha256() | |
pos = 0 | |
with open(partfile, 'wb') as f: | |
async for data in client.iter_download(mm): | |
sha.update(data) # TODO: make this async | |
f.write(data) | |
pos += len(data) | |
print( | |
f'\r{m.date} https://t.me/c/{m.chat.id}/{m.id:<6} {pos / m.file.size:>7.2%} {pos / 1024 / 1024:7.2f} MiB / {m.file.size / 1024 / 1024:<7.2f}MiB {m.file.name}', | |
end='') | |
fn = f"files/{sha.hexdigest()}.{m.file.size}" | |
if os.path.lexists(fn): | |
os.remove(partfile) | |
print(" Duplicate!") | |
else: | |
os.rename(partfile, fn) | |
print() | |
if os.path.lexists(sn): | |
os.remove(sn) | |
os.symlink("../" + fn, sn) | |
os.utime(sn, (m.date.astimezone().timestamp(), )*2, follow_symlinks=False) | |
except Exception: | |
print(f"\n{m.date} https://t.me/c/{m.chat.id}/{m.id} ERROR downloading:") | |
traceback.print_exc() | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='Download telegram chat files') | |
parser.add_argument('api_id', type=int) | |
parser.add_argument('api_hash') | |
parser.add_argument('chat', type=int) | |
parser.add_argument('--mime', type=re.compile, default=None, help="Download only files with mime-type matching this regexp") | |
parser.add_argument('--start-date', type=datetime.fromisoformat, default=None, help="Start downloading from this date") | |
parser.add_argument('--sess-name', default=None) | |
args = parser.parse_args() | |
client = TelegramClient(args.sess_name, args.api_id, args.api_hash) | |
with client: | |
client.loop.run_until_complete(main(client, args)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment