Skip to content

Instantly share code, notes, and snippets.

@ripiuk
Last active June 19, 2019 21:08
Show Gist options
  • Save ripiuk/be3bfe334b31306f8921acbee2ecc371 to your computer and use it in GitHub Desktop.
Save ripiuk/be3bfe334b31306f8921acbee2ecc371 to your computer and use it in GitHub Desktop.
Get instagram images/videos of some user
import re
import os
import time
import uuid
import json
import asyncio
import typing as typ
import urllib.parse as urlparse
from argparse import ArgumentParser
from aiohttp import ClientSession
def flatten(li: typ.List[typ.Union[str, typ.List[str]]]) -> typ.Iterable[str]:
for el in li:
if isinstance(el, list):
yield from flatten(el)
else:
yield el
async def download_imgs(imgs: typ.List[typ.Union[str, typ.List[str]]],
download_dir: str, session: ClientSession) -> None:
tasks = []
if not os.path.exists(download_dir):
os.makedirs(download_dir)
async def _download_image(data_url: str):
async with session.get(data_url) as response:
resp_data = await response.read()
extention = "mp4" if "mp4" in data_url else "jpg"
with open(f"{download_dir}/{str(uuid.uuid4())}.{extention}", 'wb') as file:
file.write(resp_data)
for img in flatten(imgs):
task = asyncio.ensure_future(_download_image(img))
tasks.append(task)
await asyncio.gather(*tasks)
async def get_imgs(session: ClientSession, user: str, limit: int,
with_child: bool = True, with_video: bool = True) -> typ.List[typ.Union[str, typ.List[str]]]:
data, posts_num, has_next_page = [], 0, True
headers = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;"
"q=0.8,application/signed-exchange;v=b3",
"accept-encoding": "gzip, deflate, br",
"accept-language": "uk-UA,uk;q=0.9,ru;q=0.8,en-US;q=0.7,en;q=0.6",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36",
}
async with session.get("https://www.instagram.com/" + user, headers=headers) as response:
resp = await response.text()
shared_data = re.search((r'(?<=window._sharedData = ).*?(?=;<)'), resp).group(0)
shared_data = json.loads(shared_data)
user_info = shared_data['entry_data']['ProfilePage'][0]['graphql']['user']
user_id = user_info['id']
timeline_media = user_info['edge_owner_to_timeline_media']
print(f"Max img in the profile: {timeline_media['count']}")
def get_data(edges, aim=None):
aim = data if not isinstance(aim, list) else aim
for el in edges:
if with_child and el['node'].get('edge_sidecar_to_children'):
aim.append(list())
get_data(el['node']['edge_sidecar_to_children']['edges'], aim=aim[-1])
elif with_video and el['node']['is_video']:
aim.append(el['node']['video_url'])
else:
aim.append(el['node']['display_url'])
while posts_num < limit:
if not has_next_page:
return data[:limit]
variables = dict(id=user_id, first=50)
params = {
"query_hash": "f2405b236d85e8296cf30347c9f08c2a",
"variables": json.dumps(variables)
}
query = urlparse.urlencode(params)
url = "https://www.instagram.com/graphql/query/?" + query
headers = {
"accept": "*/*",
"accept-encoding": "gzip, deflate, br",
"accept-language": "uk-UA,uk;q=0.9,ru;q=0.8,en-US;q=0.7,en;q=0.6",
"referer": f"https://www.instagram.com/{user}/",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/75.0.3770.80 Safari/537.36",
"x-requested-with": "XMLHttpRequest"
}
async with session.get(url, headers=headers) as response:
resp = await response.json()
timeline_media = resp['data']['user']['edge_owner_to_timeline_media']
has_next_page = timeline_media['page_info']['has_next_page']
next_hash = timeline_media['page_info']['end_cursor']
variables['after'] = next_hash
get_data(timeline_media['edges'])
posts_num += len(timeline_media['edges'])
return data[:limit]
async def main(args):
sm = asyncio.Semaphore(100)
async with ClientSession() as session:
async with sm:
data = await get_imgs(session, args.user, limit=args.limit, with_child=args.child, with_video=args.video)
data_count = len(data)
post_length = len(str(data_count)) * 2
print(f"Got {data_count} posts (limit: {args.limit}):")
print(f"{'post №':>{post_length}} | url")
for i, data_url in enumerate(data, 1):
if isinstance(data_url, list):
for j, sub_url in enumerate(data_url, 1):
print(f"{str(i)+'.'+str(j):>{post_length}} | {sub_url}")
continue
print(f"{i:>{post_length}} | {data_url}")
if args.save:
args.dir = f"{args.dir}/{args.user}"
await download_imgs(data, args.dir, session)
print(f"Saved {data_count} posts to the {args.dir} directory")
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("user", type=str, help="username in instagram")
parser.add_argument("-l", "--limit", type=int, default=12, help="number of posts")
parser.add_argument("-s", "--save", action="store_true", help="download images")
parser.add_argument("-d", "--dir", type=str, help="save images to the directory", default="imgs")
parser.add_argument("-v", "--video", action="store_false", help="ignore videos, store only cover images for them")
parser.add_argument("-c", "--child", action="store_false", help="ignore images from collections")
arguments = parser.parse_args()
start = time.time()
try:
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(main(arguments))
loop.run_until_complete(future)
except (KeyError, IndexError, json.JSONDecodeError) as err:
print("Got an error:", type(err), err)
print("Time:", time.time() - start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment