Skip to content

Instantly share code, notes, and snippets.

@morrah
Created August 23, 2018 03:23
Show Gist options
  • Save morrah/13a9730a3633b6145652b943e384f697 to your computer and use it in GitHub Desktop.
Save morrah/13a9730a3633b6145652b943e384f697 to your computer and use it in GitHub Desktop.
dumps direct links to pic/video inside every post of chosen user
#!/usr/bin/env python
#-*- coding: utf-8 -*-
import sys
from datetime import datetime
import ssl
import urllib
import urllib2
import json
import re
import hashlib
import gevent
from gevent.monkey import patch_all; patch_all()
from gevent.pool import Pool
def get_page(url, headers=None):
_headers = {
"Content-type": "application/x-www-form-urlencoded",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36"
}
if headers:
_headers.update(headers)
req = urllib2.Request(url)
context = None
if SKIP_CERT_CHECK:
context = ssl._create_unverified_context()
for k,v in _headers.iteritems():
req.add_header(k,v)
response = None
try:
response = urllib2.urlopen(req, context=context)
except urllib2.HTTPError, e:
print '%s: Error %s - %s' % (datetime.now(), e.code, e.msg)
except urllib2.URLError, e:
print '%s: Error %s - %s' % (datetime.now(), e.args, e)
if not response:
return
return response.read()
def parse_base_page(page_src):
match = re.search('<script type="text\/javascript">window\._sharedData = (.*?);<\/script>', page_src)
data = None
rhx_gis = None
user_id = None
if match:
shared_data = json.loads(match.group(1))
data = shared_data['entry_data']['ProfilePage'][0]['graphql']
rhx_gis = shared_data['rhx_gis']
user_id = shared_data['entry_data']['ProfilePage'][0]['graphql']['user']['id']
return data, rhx_gis, user_id
def json_to_posts(data):
posts = []
end_cursor = data['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
nodes = data['user']['edge_owner_to_timeline_media']['edges']
for node in nodes:
shortcode = node['node']['shortcode']
print shortcode
full_url = 'https://www.instagram.com/p/%s/' % shortcode
posts.append(full_url)
return posts, end_cursor
def x_instagram_gis(rhx_gis, query):
hashstring = '%s:%s' % (rhx_gis, json.dumps(query))
return hashlib.md5(hashstring).hexdigest()
def async_get_post(url, headers, videos, images):
page = get_page(url, headers)
try:
page_json = json.loads(page)
except e, ValueError:
print page
print url
raise e
if (page_json['graphql']['shortcode_media']['is_video']):
download_url = page_json['graphql']['shortcode_media']['video_url']
print download_url
videos.append(download_url)
elif(page_json['graphql']['shortcode_media']['__typename'] == 'GraphImage'):
download_url = page_json['graphql']['shortcode_media']['display_resources'][-1]['src']
print download_url
images.append(download_url)
if __name__ == "__main__":
# 1. parse base page for "rhx_gis" and "end_cursor" params; posts json
# 2. calculate "x-instagram-gis" header as md5(${rhx_gis}:${json_encoded_query_string})
# 3. send query to retrieve next page with new end_cursor
LOAD_SIZE = 12 # default number of loaded posts in browser
THREAD_COUNT = 30 # number of green threads for async download url retrieve
SKIP_CERT_CHECK = False # change to True in case problems with certificates you can't deal with
if len(sys.argv) == 2:
username = sys.argv[1]
use_posts_from_file = False
elif len(sys.argv) == 3:
username = sys.argv[1]
use_posts_from_file = (sys.argv[2] == "1")
else:
print 'usage: python insta.py <instagram_username> [1]'
print ' 1 = use post urls from insta_posts.txt file'
print 'outputs:'
print ' "insta_posts.txt" with direct urls to every post;'
print ' "insta_videos.txt" with direct urls to every video;'
print ' "insta_images.txt" with direct urls to every image in highest resolution;'
print ' use wget to download any of these direct url files'
print 'example: wget64 -i \%1 -P \%~n1 --no-check-certificate -nc'
sys.exit()
print username
url = 'https://www.instagram.com/%s/' % (username)
page_src = get_page(url)
data, rhx_gis, user_id = parse_base_page(page_src)
print 'rhx_gis: %s; user_id: %s' % (rhx_gis, user_id)
if not use_posts_from_file:
posts, end_cursor = json_to_posts(data)
with open('insta_posts.txt', 'w') as f:
f.write('\n'.join(posts))
while end_cursor:
query = {"id":user_id, "first":LOAD_SIZE, "after":end_cursor}
headers = {
'x_instagram_gis': x_instagram_gis(rhx_gis, query),
}
variables = urllib.quote_plus(json.dumps(query))
ajax_url = 'https://www.instagram.com/graphql/query/?query_hash=a5164aed103f24b03e7b7747a2d94e3c&variables=%s' % variables
page_src = get_page(ajax_url, headers)
posts, end_cursor = json_to_posts(json.loads(page_src)['data'])
with open('insta_posts.txt', 'a') as f:
f.write('\n'+'\n'.join(posts))
# retrieve the direct url for content inside every post
with open('insta_posts.txt', 'r') as f:
posts = f.read().split()
videos = []
images = []
pool = Pool(THREAD_COUNT)
query = {"__a":1}
headers = {
'x_instagram_gis': x_instagram_gis(rhx_gis, query),
}
for post in posts:
url = post + '?__a=1'
pool.spawn(async_get_post, url, headers, videos, images)
pool.join()
with open('insta_videos.txt', 'w') as f:
f.write('\n'.join(videos))
with open('insta_images.txt', 'w') as f:
f.write('\n'.join(images))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment