Skip to content

Instantly share code, notes, and snippets.

@ChronoMonochrome
Last active September 22, 2018 19:02
Show Gist options
  • Save ChronoMonochrome/3c7ca2f26ca4984e5002378126469f01 to your computer and use it in GitHub Desktop.
Save ChronoMonochrome/3c7ca2f26ca4984e5002378126469f01 to your computer and use it in GitHub Desktop.
A script to download all the media from the specified Telegram channel.
import os, sys
from telethon import TelegramClient, sync
import asyncio
# These example values won't work. You must get your own api_id and
# api_hash from https://my.telegram.org, under API Development.
API_ID = 12345
API_HASH = '0123456789abcdef0123456789abcdef'
count = 0
len_queue_full = 0
client = None
msgs = []
def download_chat(client, channel_name, root_dir, limit):
channel_dir = os.path.join(root_dir, channel_name)
try:
os.makedirs(channel_dir)
except FileExistsError:
pass
os.chdir(channel_dir)
if limit == -1:
msgs = client.get_messages(channel_name, limit = 99999)
else:
msgs = client.get_messages(channel_name, limit = limit)
return msgs
async def crawl_worker(msg_id):
global len_queue_full, msgs
try:
file_name = "%d.jpg" % msg_id
if (not os.path.exists(file_name)) or os.stat(file_name).st_size == 0:
await msgs[msg_id].download_media(file = file_name)
except:
return
print("[%d / %d] %s" % (msg_id + 1, len_queue_full, file_name))
async def coro(start, num_threads = 5):
global len_queue_full
#print(len_queue_full)
tasks = [asyncio.ensure_future(crawl_worker(i)) for i in range(start, start + num_threads)]
await asyncio.wait(tasks)
def main(channel_name, root_dir = "G:/telegram", limit = -1, start = 0):
global len_queue_full, count, client, msgs
if not client:
client = TelegramClient('session_name', API_ID, API_HASH)
client.start()
print("getting messages from %s" % channel_name)
msgs = download_chat(client, channel_name, root_dir, limit)[::-1]
len_queue_full = len(msgs)
print("ready")
try:
import cryptg
num_threads = 50
except:
num_threads = 8
ioloop = asyncio.get_event_loop()
while start < len_queue_full:
ioloop.run_until_complete(coro(start, num_threads))
start += num_threads
#ioloop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment