Skip to content

Instantly share code, notes, and snippets.

@JuniorJPDJ
Last active February 28, 2021 16:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JuniorJPDJ/41bf84981bcf0bac88079e0944edb9fa to your computer and use it in GitHub Desktop.
Save JuniorJPDJ/41bf84981bcf0bac88079e0944edb9fa to your computer and use it in GitHub Desktop.
telegram batch file downloader (with deduplication and file date preserving)
#!/usr/bin/env python3
import hashlib
import os
import time
import traceback
from datetime import datetime
import argparse
import re
from telethon import TelegramClient, helpers, utils
def get_name(msg, path):
kind, possible_names = client._get_kind_and_names(msg.media.document.attributes)
possible_names = [f'{msg.id}_{x}' for x in possible_names]
directory, name = path, possible_names[0]
name, ext = os.path.splitext(name)
if not ext:
ext = utils.get_extension(msg.media)
result = os.path.join(directory, name + ext)
return result
# return client._get_proper_filename(
# path, kind, utils.get_extension(msg.media),
# date=msg.date, possible_names=possible_names
# )
async def main(client, args):
await client.start()
chat = await client.get_entity(args.chat)
os.makedirs("files", exist_ok=True)
async for m in client.iter_messages(chat, reverse=True, offset_date=args.start_date):
if m.file is not None and (args.mime is None or (m.file.mime_type is not None and args.mime.match(m.file.mime_type))):
try:
mm = m.media
sn = f'{m.chat.id}/'
helpers.ensure_parent_dir_exists(sn)
sn = get_name(m, sn)
if os.path.exists(sn):
print(f'{m.date} https://t.me/c/{m.chat.id}/{m.id:<6} 100.00% {m.file.size / 1024 / 1024:7.2f} MiB / {m.file.size / 1024 / 1024:<7.2f}MiB {m.file.name} ALREADY EXISTS!')
continue
partfile = f".{m.chat.id}_{m.id}.partfile"
if os.path.lexists(partfile):
os.remove(partfile)
sha = hashlib.sha256()
pos = 0
with open(partfile, 'wb') as f:
async for data in client.iter_download(mm):
sha.update(data) # TODO: make this async
f.write(data)
pos += len(data)
print(
f'\r{m.date} https://t.me/c/{m.chat.id}/{m.id:<6} {pos / m.file.size:>7.2%} {pos / 1024 / 1024:7.2f} MiB / {m.file.size / 1024 / 1024:<7.2f}MiB {m.file.name}',
end='')
fn = f"files/{sha.hexdigest()}.{m.file.size}"
if os.path.lexists(fn):
os.remove(partfile)
print(" Duplicate!")
else:
os.rename(partfile, fn)
print()
if os.path.lexists(sn):
os.remove(sn)
os.symlink("../" + fn, sn)
os.utime(sn, (m.date.astimezone().timestamp(), )*2, follow_symlinks=False)
except Exception:
print(f"\n{m.date} https://t.me/c/{m.chat.id}/{m.id} ERROR downloading:")
traceback.print_exc()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Download telegram chat files')
parser.add_argument('api_id', type=int)
parser.add_argument('api_hash')
parser.add_argument('chat', type=int)
parser.add_argument('--mime', type=re.compile, default=None, help="Download only files with mime-type matching this regexp")
parser.add_argument('--start-date', type=datetime.fromisoformat, default=None, help="Start downloading from this date")
parser.add_argument('--sess-name', default=None)
args = parser.parse_args()
client = TelegramClient(args.sess_name, args.api_id, args.api_hash)
with client:
client.loop.run_until_complete(main(client, args))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment