Skip to content

Instantly share code, notes, and snippets.

@dvingerh
Last active May 11, 2020 20:04
Show Gist options
  • Save dvingerh/2745b62f46cf2f20852a4393c8c72f8c to your computer and use it in GitHub Desktop.
Save dvingerh/2745b62f46cf2f20852a4393c8c72f8c to your computer and use it in GitHub Desktop.
import threading
import time
import os
import sys
import codecs
import json
import argparse
import datetime
import subprocess
from xml.dom.minidom import parseString
try:
import urllib.request as urllib
except ImportError:
import urllib as urllib
try:
from instagram_private_api import (
Client, ClientError, ClientLoginError,
ClientCookieExpiredError, ClientLoginRequiredError,
__version__ as client_version)
except ImportError:
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from instagram_private_api import (
Client, ClientError, ClientLoginError,
ClientCookieExpiredError, ClientLoginRequiredError,
__version__ as client_version)
from instagram_private_api import ClientConnectionError
from instagram_private_api import ClientError
from instagram_private_api import ClientThrottledError
from instagram_private_api import Client, ClientCompatPatch
##Classes and Vars
class media_post:
def __init__(self, **kwargs):
self.post_code = None
self.post_caption = None
self.post_content_images = []
self.post_content_videos = []
new_posts = []
new_stories = [] #No extra class needed for stories
user_to_check = None
script_version = "1.0"
python_version = sys.version.split(' ')[0]
##Login
def to_json(python_object):
if isinstance(python_object, bytes):
return {'__class__': 'bytes',
'__value__': codecs.encode(python_object, 'base64').decode()}
raise TypeError(repr(python_object) + ' is not JSON serializable')
def from_json(json_object):
if '__class__' in json_object and json_object.get('__class__') == 'bytes':
return codecs.decode(json_object.get('__value__').encode(), 'base64')
return json_object
def onlogin_callback(api, settings_file):
cache_settings = api.settings
with open(settings_file, 'w') as outfile:
json.dump(cache_settings, outfile, default=to_json)
print('[I] New auth cookie file was made: {0!s}'.format(settings_file))
def login(username="", password=""):
device_id = None
try:
settings_file = "credentials.json"
if not os.path.isfile(settings_file):
# settings file does not exist
print('[W] Unable to find auth cookie file: {0!s} (creating a new one...)'.format(settings_file))
# login new
api = Client(
username, password,
on_login=lambda x: onlogin_callback(x, settings_file))
else:
with open(settings_file) as file_data:
cached_settings = json.load(file_data, object_hook=from_json)
device_id = cached_settings.get('device_id')
# reuse auth settings
api = Client(
username, password,
settings=cached_settings)
print('[I] Using cached login cookie for "' + api.authenticated_user_name + '".')
except (ClientCookieExpiredError, ClientLoginRequiredError) as e:
print('[E] ClientCookieExpiredError/ClientLoginRequiredError: {0!s}'.format(e))
# Login expired
# Do relogin but use default ua, keys and such
if (username != "" and password != ""):
api = Client(
username, password,
device_id=device_id,
on_login=lambda x: onlogin_callback(x, settings_file))
else:
print("[E] The login cookie has expired, but no login arguments were given.")
print("[E] Please supply --username and --password arguments.")
print('-' * 70)
sys.exit(0)
except ClientLoginError as e:
print('[E] Could not login: {:s}.\n[E] {:s}\n\n{:s}'.format(json.loads(e.error_response).get("error_title", "Error title not available."), json.loads(e.error_response).get("message", "Not available"), e.error_response))
print('-' * 70)
sys.exit(9)
except ClientError as e:
print('[E] Client Error: {0!s}\n[E] Message: {1!s}\n[E] Code: {2:d}\n\n[E] Full response:\n{3!s}\n'.format(e.msg, json.loads(e.error_response).get("message", "Additional error information not available."), e.code, e.error_response))
print('-' * 70)
sys.exit(9)
except Exception as e:
if (str(e).startswith("unsupported pickle protocol")):
print("[W] This cookie file is not compatible with Python {}.".format(sys.version.split(' ')[0][0]))
print("[W] Please delete your cookie file 'credentials.json' and try again.")
else:
print('[E] Unexpected Exception: {0!s}'.format(e))
print('-' * 70)
sys.exit(99)
print('[I] Login to "' + api.authenticated_user_name + '" OK!')
cookie_expiry = api.cookie_jar.expires_earliest
print('[I] Login cookie expiry date: {0!s}'.format(datetime.datetime.fromtimestamp(cookie_expiry).strftime('%Y-%m-%d at %I:%M:%S %p')))
return api
##Downloader
def check_directories(user_to_check):
try:
if not os.path.isdir(os.getcwd() + "/stories/{}/".format(user_to_check)):
os.makedirs(os.getcwd() + "/stories/{}/".format(user_to_check))
if not os.path.isdir(os.getcwd() + "/posts/{}/".format(user_to_check)):
os.makedirs(os.getcwd() + "/posts/{}/".format(user_to_check))
return True
except Exception:
return False
def get_media_posts(user_to_check, user_id, ig_client):
try:
global new_posts
new_posts = []
print("[I] Getting posts for user: {:s}".format(user_to_check))
print('-' * 70)
try:
feed = ig_client.user_feed(user_id)
except Exception as e:
print("[W] An error occurred: " + str(e))
sys.exit(1)
list_posts_new = []
try:
feed_json = feed['items']
open("pyigdumper_feed_posts.json", "w").write(json.dumps(feed_json))
for post_iter in feed_json:
post_obj = media_post()
post_obj.post_code = post_iter['code']
try:
post_obj.post_caption = post_iter['caption']['text']
except:
post_obj.post_caption = "No caption available."
# Images
if 'carousel_media' in post_iter:
for media_carousel in post_iter['carousel_media']:
post_obj.post_content_images.append(media_carousel['image_versions2']['candidates'][0]['url'])
if 'image_versions2' in post_iter:
post_obj.post_content_images.append(post_iter['image_versions2']['candidates'][0]['url'])
# Videos
if 'carousel_media' in post_iter:
for media_carousel in post_iter['carousel_media']:
if 'video_versions' in media_carousel:
post_obj.post_content_videos.append(media_carousel['video_versions'][0]['url'])
if 'video_versions' in post_iter:
post_obj.post_content_videos.append(post_iter['video_versions'][0]['url'])
new_posts.append(post_obj)
for new_posts_iter in new_posts:
was_new_post = False
for video in new_posts_iter.post_content_videos:
filename = video.split('/')[-1].split('?')[0]
final_filename = filename.split('.')[0] + ".mp4"
save_path = os.getcwd() + "/posts/{}/".format(user_to_check) + final_filename
if not os.path.exists(save_path):
print ("[I] Downloading video: {:s}".format(final_filename))
download_file(video, save_path)
was_new_post = True
else:
print("[I] Post item already exists: {:s}".format(final_filename))
for image in new_posts_iter.post_content_images:
filename = (image.split('/')[-1]).split('?', 1)[0]
final_filename = filename.split('.')[0] + ".jpg"
save_path = os.getcwd() + "/posts/{}/".format(user_to_check) + final_filename
if not os.path.exists(save_path):
print ("[I] Downloading image: {:s}".format(final_filename))
download_file(image, save_path)
was_new_post = True
else:
print("[I] Post item already exists: {:s}".format(final_filename))
if was_new_post:
list_posts_new.append(new_posts_iter)
if (len(list_posts_new) != 0):
print('-' * 70)
print("[I] Post downloading ended with " + str(len(list_posts_new)) + " new post(s) downloaded.")
print('-' * 70)
new_posts = list_posts_new
return new_posts
else:
print('-' * 70)
print("[I] No new posts were downloaded.")
print('-' * 70)
except Exception as e:
print("[E] An error occurred: " + str(e))
print('-' * 70)
sys.exit(1)
except KeyboardInterrupt as e:
print("[I] User aborted download.")
print('-' * 70)
sys.exit(1)
def command_exists(command):
try:
fnull = open(os.devnull, 'w')
subprocess.call([command], stdout=fnull, stderr=subprocess.STDOUT)
return True
except OSError as e:
return False
def get_media_story(user_to_check, user_id, ig_client, hq_videos=False):
if hq_videos and command_exists("ffmpeg"):
print("[I] Downloading high quality videos enabled. Ffmpeg will be used.")
print('-' * 70)
elif hq_videos and not command_exists("ffmpeg"):
print("[W] Downloading high quality videos enabled but Ffmpeg could not be found. Falling back to default.")
hq_videos = False
print('-' * 70)
try:
print("[I] Getting stories for user: {:s}".format(user_to_check))
print('-' * 70)
try:
feed = ig_client.user_story_feed(user_id)
except Exception as e:
print("[W] An error occurred: " + str(e))
print('-' * 70)
sys.exit(1)
try:
feed_json = feed['reel']['items']
open("pyigdumper_feed_story.json", "w").write(json.dumps(feed_json))
except TypeError as e:
print("[I] There are no recent stories to process for this user.")
print('-' * 70)
return
list_video_v = []
list_video_a = []
list_video = []
list_image = []
list_video_new = []
list_image_new = []
for media in feed_json:
if 'video_versions' in media:
if hq_videos:
video_manifest = parseString(media['video_dash_manifest'])
video_period = video_manifest.documentElement.getElementsByTagName('Period')
video_representations = video_period[0].getElementsByTagName('Representation')
video_url = video_representations.pop().getElementsByTagName('BaseURL')[0].childNodes[0].nodeValue
audio_url = video_representations[0].getElementsByTagName('BaseURL')[0].childNodes[0].nodeValue
print(video_url)
print(audio_url)
list_video_v.append(video_url)
list_video_a.append(audio_url)
else:
list_video.append(media['video_versions'][0]['url'])
if 'image_versions2' in media:
list_image.append(media['image_versions2']['candidates'][0]['url'])
if hq_videos:
for index, video in enumerate(list_video_v):
filename = video.split('/')[-1]
final_filename = filename.split('.')[0] + ".mp4"
save_path_video = os.getcwd() + "/stories/{}/".format(user_to_check) + final_filename.replace(".mp4", ".video.mp4")
save_path_audio = save_path_video.replace(".video.mp4", ".audio.mp4")
save_path_final = save_path_video.replace(".video.mp4", ".mp4")
if not os.path.exists(save_path_final):
print("[I] Downloading video: {:s}".format(final_filename))
try:
download_file(video, save_path_video)
download_file(list_video_a[index], save_path_audio)
ffmpeg_binary = os.getenv('FFMPEG_BINARY', 'ffmpeg')
cmd = [
ffmpeg_binary, '-loglevel', 'fatal', '-y',
'-i', save_path_video,
'-i', save_path_audio,
'-c:v', 'copy', '-c:a', 'copy', save_path_final]
#fnull = open(os.devnull, 'w')
fnull = None
exit_code = subprocess.call(cmd, stdout=fnull, stderr=subprocess.STDOUT)
if exit_code != 0:
print("[W] FFmpeg exit code not '0' but '{:d}'.".format(exit_code))
os.remove(save_path_video)
os.remove(save_path_audio)
return
else:
print('[I] Ffmpeg generated video: %s' % os.path.basename(save_path_final))
os.remove(save_path_video)
os.remove(save_path_audio)
list_video_new.append(save_path_final)
except Exception as e:
print("[W] An error occurred: " + str(e))
exit(1)
else:
print("[I] Story already exists: {:s}".format(final_filename))
else:
for video in list_video:
filename = video.split('/')[-1]
final_filename = filename.split('.')[0] + ".mp4"
save_path = os.getcwd() + "/stories/{}/".format(user_to_check) + final_filename
if not os.path.exists(save_path):
print("[I] Downloading video: {:s}".format(final_filename))
try:
download_file(video, save_path)
list_video_new.append(save_path)
except Exception as e:
print("[W] An error occurred: " + str(e))
exit(1)
else:
print("[I] Story already exists: {:s}".format(final_filename))
for image in list_image:
filename = (image.split('/')[-1]).split('?', 1)[0]
final_filename = filename.split('.')[0] + ".jpg"
save_path = os.getcwd() + "/stories/{}/".format(user_to_check) + final_filename
if not os.path.exists(save_path):
print ("[I] Downloading image: {:s}".format(final_filename))
download_file(image, save_path)
else:
print("[I] Story item already exists: {:s}".format(final_filename))
if (len(list_image_new) != 0) or (len(list_video_new) != 0):
print('-' * 70)
print("[I] Story downloading ended with " + str(len(list_image_new)) + " new images and " + str(len(list_video_new)) + " new videos downloaded.")
print('-' * 70)
global new_stories
new_stories = list_image_new + list_video_new
return new_stories
else:
print('-' * 70)
print("[I] No new stories were downloaded.")
print('-' * 70)
except Exception as e:
print("[E] An error occurred: " + str(e))
print('-' * 70)
sys.exit(1)
except KeyboardInterrupt as e:
print("[I] User aborted download.")
print('-' * 70)
sys.exit(1)
def download_file(url, path, attempt=0):
try:
urllib.urlretrieve(url, path)
urllib.urlcleanup()
except Exception as e:
if not attempt == 3:
attempt += 1
print("[E] ({:d}) Download failed: {:s}.".format(attempt, str(e)))
print("[W] Trying again in 5 seconds.")
time.sleep(5)
download_file(url, path, attempt)
else:
print("[E] Retry failed three times, skipping file.")
print('-' * 70)
def start():
print("-" * 70)
print('[I] PYIGDUMPER (SCRIPT V{:s} - PYTHON V{:s}) - {:s}'.format(script_version, python_version, time.strftime('%I:%M:%S %p')))
print("-" * 70)
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--username', dest='username', type=str, required=False, help="Instagram username to login with.")
parser.add_argument('-p', '--password', dest='password', type=str, required=False, help="Instagram password to login with.")
parser.add_argument('-d', '--download', nargs='+', dest='download', type=str, required=False,
help="Instagram user to download stories from.")
parser.add_argument('-b,', '--batch-file', dest='batchfile', type=str, required=False,
help="Read a text file of usernames to download stories from.")
parser.add_argument('-hqv', '--hq-videos', dest='hqvideos', action='store_true', help="Get higher quality video stories. Requires Ffmpeg.")
# Workaround to 'disable' argument abbreviations
parser.add_argument('--usernamx', help=argparse.SUPPRESS, metavar='IGNORE')
parser.add_argument('--passworx', help=argparse.SUPPRESS, metavar='IGNORE')
parser.add_argument('--downloax', help=argparse.SUPPRESS, metavar='IGNORE')
args, unknown = parser.parse_known_args()
if args.download or args.batchfile:
if args.download:
users_to_check = args.download
else:
if os.path.isfile(args.batchfile):
users_to_check = [user.rstrip('\n') for user in open(args.batchfile)]
if not users_to_check:
print("[E] The specified file is empty.")
print("-" * 70)
sys.exit(1)
else:
print("[I] downloading {:d} users from batch file.".format(len(users_to_check)))
print("-" * 70)
else:
print('[E] The specified file does not exist.')
print("-" * 70)
sys.exit(1)
else:
print('[E] No usernames provided. Please use the -d or -b argument.')
print("-" * 70)
sys.exit(1)
if (args.username and args.password):
ig_client = login(args.username, args.password)
else:
settings_file = "credentials.json"
if not os.path.isfile(settings_file):
print("[E] No username/password provided, but there is no login cookie present either.")
print("[E] Please supply --username and --password arguments.")
sys.exit(1)
else:
ig_client = login()
print("-" * 70)
print("[I] Files will be downloaded to {:s}".format(os.getcwd()))
print("-" * 70)
def download_user(index, user, attempt=0):
try:
if not user.isdigit():
user_res = ig_client.username_info(user)
user_id = user_res['user']['pk']
else:
user_id = user
user_info = ig_client.user_info(user_id)
if not user_info.get("user", None):
raise Exception("No user is associated with the given user id.")
else:
user = user_info.get("user").get("username")
print("[I] Getting stories for: {:s}".format(user))
print('-' * 70)
if check_directories(user):
follow_res = ig_client.friendships_show(user_id)
if follow_res.get("is_private") and not follow_res.get("following"):
raise Exception("You are not following this private user.")
get_media_posts(user, user_id, ig_client)
#get_media_story(args.download, user_id, ig_client)
else:
print("[E] Could not make required directories. Please create a 'stories' folder manually.")
exit(1)
if (index + 1) != len(users_to_check):
print('-' * 70)
print('[I] ({}/{}) 5 second time-out until next user...'.format((index + 1), len(users_to_check)))
time.sleep(5)
print('-' * 70)
except Exception as e:
if not attempt == 3:
attempt += 1
print("[E] ({:d}) Download failed: {:s}.".format(attempt, str(e)))
print("[W] Trying again in 5 seconds.")
time.sleep(5)
print('-' * 70)
download_user(index, user, attempt)
else:
print("[E] Retry failed three times, skipping user.")
print('-' * 70)
for index, user_to_check in enumerate(users_to_check):
try:
download_user(index, user_to_check)
except KeyboardInterrupt:
print('-' * 70)
print("[I] The operation was aborted.")
print('-' * 70)
exit(0)
exit(0)
# if check_directories(args.download):
# try:
# global user_to_check
# user_to_check = args.download
# user_res = ig_client.username_info(args.download)
# user_id = user_res['user']['pk']
# get_media_posts(args.download, user_id, ig_client)
# #get_media_story(args.download, user_id, ig_client)
# sys.exit(0)
# except Exception as e:
# print("[E] An error occurred: " + str(e))
# sys.exit(1)
# else:
# print("[E] Could not make required directories.\nPlease create a 'stories' folder manually.")
# sys.exit(1)
# print('-' * 70)
# sys.exit(0)
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment