Skip to content

Instantly share code, notes, and snippets.

@jsanz
Created July 27, 2019 15:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jsanz/7ef5befda5b1cae05e551e5cf0f23274 to your computer and use it in GitHub Desktop.
Save jsanz/7ef5befda5b1cae05e551e5cf0f23274 to your computer and use it in GitHub Desktop.
Python: Download Instagram pictures
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
get_ipython().system('pip3 install -U InstagramApi')
# In[37]:
import sys
from InstagramAPI import InstagramAPI
import json
import logging
import datetime
from pathlib import Path
from multiprocessing import Pool
import requests
from copy import deepcopy
# In[4]:
logging.basicConfig(
level=logging.DEBUG,
format=' %(asctime)s - %(levelname)s - %(message)s',
datefmt='%I:%M:%S %p')
logger = logging.getLogger()
logger.handlers[0].stream = sys.stdout
logging.getLogger("requests").setLevel(logging.WARNING)
# In[21]:
DATA_DIR = Path('./data')
# ## Getting and storing your Instagram data
# In[ ]:
logger.info('Logging into Instagram...')
ig = InstagramAPI(os.environ['IG_USER'],os.environ['IG_PASS'])
ig.login()
# In[ ]:
logger.info('Retrieving all your posts...')
items = ig.getTotalSelfUserFeed()
# In[ ]:
logger.info('{} items retreived'.format(len(items)))
# In[ ]:
logger.info('Saving into /data folder individual json objects...')
for post in items:
# Save the file
id = post['id']
json_file = DATA_DIR.joinpath('{}.json'.format(id))
with json_file.open(mode='w') as f:
json_file.write_text(json.dumps(post))
# ## Reading and downloading your pictures
# In[5]:
json_files = list(Path('./data').glob('*.json'))
items = list(map(lambda json_file: json.loads(json_file.read_text()) , json_files))
# In[121]:
def get_image_file(image_id, index, img_format='jpg'):
return DATA_DIR.joinpath('{}{}.{}'.format(
image_id,
'_{}'.format(index) if index > 0 else '',
img_format))
def get_biggest_file(asset):
try:
candidates = asset['image_versions2']['candidates']
final = None
final_width = 0
for candidate in candidates:
width = int(candidate['width'])
if width > final_width:
final_width = width
final = candidate
return final
except KeyError as e:
raise Exception('KeyError on image {}'.format(asset['id']))
def get_item_by_id(items, id):
candidates = list(filter(lambda item: item['id'] == id, items))
if len(candidates) == 1:
return candidates[0]
else:
return None
def download_asset(file, url):
with file.open('wb') as fd:
r = requests.get(url, allow_redirects=True, stream=True)
for chunk in r.iter_content(chunk_size=1024):
fd.write(chunk)
def get_downloadable_assets(item):
if 'image_versions2' in item:
return [item]
elif 'carousel_media' in item:
return item['carousel_media']
else:
raise Exception('Not an image or a carousel')
# In[122]:
def download_item(item):
try:
result = deepcopy(item)
item_id = result['id']
assets = get_downloadable_assets(result)
for index, asset in enumerate(assets):
image_idx = index + 1 if len(assets) > 1 else 0
image_file = get_image_file(item_id, image_idx )
candidate = get_biggest_file(asset)
if not image_file.exists():
title = item['caption']['text'] if item['caption'] != None else ''
url = candidate['url']
logger.debug('Downloading {}...'.format(title))
download_asset(image_file,url)
logger.debug(f'Finished {image_file}')
result['downloaded_image'] = str(image_file)
else:
logger.debug('Already downloaded, skipping')
return result
except Exception as e:
logger.warning('Error downloading item {} ({})'.format(item['id'],e))
return None
# In[128]:
logger.setLevel(logging.INFO)
with Pool(5) as p:
logger.info('Starting the download...')
results = p.map(download_item, items[:500])
logger.info('Finished!')
logger.setLevel(logging.DEBUG)
# In[129]:
#id = '1709015328090358891_51337672'
#item = get_item_by_id(items,id)
#download_item(item)
# In[ ]:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment