Last active
May 17, 2019 06:07
-
-
Save cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4 to your computer and use it in GitHub Desktop.
Instagram downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
import os | |
import re | |
from pathlib import Path | |
ROOT_URL = 'https://www.instagram.com/' | |
PROXY = 'http://127.0.0.1:1080' | |
HEADERS = { | |
'User-Agent': | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36', | |
'Cookie': | |
'rur=ATN; mid=XM2K8QAEAAGy8fiEf1b2T05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessionid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"' | |
} | |
class Instagram: | |
def __init__(self, username, maxtasks=200): | |
self.username = username | |
self.maxtasks = maxtasks | |
self.path = Path(f'./instagram/{username}') | |
self.path.mkdir(parents=True, exist_ok=True) | |
self.queue = asyncio.Queue(maxsize=maxtasks * 2) | |
os.environ['http_proxy'] = PROXY | |
os.environ['https_proxy'] = PROXY | |
self.session = aiohttp.ClientSession(trust_env=True, headers=HEADERS) | |
async def produce_download_urls(self, max=50): | |
end_cursor = '' | |
while True: | |
pic_params = { | |
'query_hash': | |
'f2405b236d85e8296cf30347c9f08c2a', | |
'variables': | |
'{{"id":"{0}","first":{1},"after":"{2}"}}'.format( | |
self.user_id, max, end_cursor), | |
} | |
pic_url = ROOT_URL + 'graphql/query/' | |
async with self.session.get(pic_url, params=pic_params) as resp: | |
json = await resp.json() | |
edge_media = json['data']['user'][ | |
'edge_owner_to_timeline_media'] | |
edges = edge_media['edges'] | |
if edges: | |
for edge in edges: | |
await self.queue.put(edge['node']['display_url']) | |
has_next_page = edge_media['page_info']['has_next_page'] | |
if has_next_page: | |
end_cursor = edge_media['page_info']['end_cursor'] | |
else: | |
break | |
async def download(self): | |
while not (self.producer.done() and self.queue.empty()): | |
url = await self.queue.get() | |
filename = self.path / url.split('?')[0].split('/')[-1] | |
async with self.session.get(url) as resp: | |
with filename.open('wb') as f: | |
async for chunk in resp.content.iter_any(): | |
f.write(chunk) | |
print('.', end='', flush=True) | |
async def run(self): | |
print('Preparing...') | |
async with self.session.get(ROOT_URL + self.username) as resp: | |
html = await resp.text() | |
shared_data = html.split("window._sharedData = ")[1].split( | |
";</script>")[0] | |
if not shared_data: | |
print('!!!!!!!') | |
exit(1) | |
self.user_id = re.findall( | |
'"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0] | |
self.producer = asyncio.create_task(self.produce_download_urls()) | |
print('Downloading...', end='', flush=True) | |
await asyncio.gather(*(self.download() for _ in range(self.maxtasks))) | |
async def close(self): | |
await self.session.close() | |
async def main(): | |
ins = Instagram('taeri__taeri') | |
try: | |
await ins.run() | |
finally: | |
await ins.close() | |
if __name__ == '__main__': | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment