Skip to content

Instantly share code, notes, and snippets.

@fcharmy
Last active September 26, 2018 10:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fcharmy/7aa147914166fe512316b9fd81b78c2f to your computer and use it in GitHub Desktop.
Save fcharmy/7aa147914166fe512316b9fd81b78c2f to your computer and use it in GitHub Desktop.
Crawl IG hashtag posts including post link, main images url, thumbnail, is_video, username, likes, caption, posted time, user profile image url. Only show the spider implementation, pipeline need to be implement separately.
import json
import urllib.parse as urlparse
from scrapy import Item, Field, Spider, Request
from scrapy.exceptions import CloseSpider
# ------ Instagram settings ------
IG_SERVER = "https://www.instagram.com"
IG_PATH = '/explore/tags/'
IG_POST_PATH = "p/"
IG_DEFAULT_PARAM = '?__a=1'
IG_CURSOR_PARAM = '&max_id='
IG_CURSOR_NODE = ['graphql', 'hashtag', 'edge_hashtag_to_media', 'page_info', 'end_cursor']
IG_NEXT_NODE = ['graphql', 'hashtag', 'edge_hashtag_to_media', 'page_info', 'has_next_page']
# define post nodes and extract post items
IG_POSTS_NODE = ['graphql', 'hashtag', 'edge_hashtag_to_media', 'edges']
IG_SHORT_CUT = ['shortcode']
IG_ITEM = {
"url": ['display_url'],
"thumbnail_url": ['thumbnail_src'],
"posted": ['taken_at_timestamp'],
"like": ["edge_media_preview_like", "count"],
"caption": ['edge_media_to_caption', 'edges', 0, 'node', 'text'],
"is_video": ['is_video']
}
# define a post node from post page and extract user info
IG_POST_NODE = ['graphql', 'shortcode_media']
IG_USER_NAME = IG_POST_NODE + ['owner', 'username']
IG_USER_PROFILE = IG_POST_NODE + ['owner', 'profile_pic_url']
# ------ End of Instagram settings ------
class ImageItem(Item):
url = Field()
thumbnail_url = Field()
username = Field()
link = Field()
posted = Field(serializer=int)
like = Field(serializer=int)
caption = Field()
profile = Field()
is_video = Field(serializer=bool)
class HashtagSpider(Spider):
name = "hashtag_spider"
def __init__(self, *, hashtag=None, max_times=None, **kwargs):
"""
Crawl IG by hashtag
:param hashtag: hashtag name without '#',
eg. hashtag='aloha' if post with #aloha
:param max_times: max time of calling api,
every call will return roughly 60 items,
so if you need 300 posts just set max_times as 5
:param kwargs: overwrite default settings
Ps: pipeline can set close_down to true to stop crawler
to prevent duplicate posts.
"""
super(HashtagSpider, self).__init__(**kwargs)
for (k, v) in kwargs.items():
setattr(self, k, v)
self.SERVER = kwargs.get('server', IG_SERVER)
self.PATH = kwargs.get('path', IG_PATH)
self.DEFAULT_PARAM = kwargs.get('param', IG_DEFAULT_PARAM)
self.PARAM_NAME = kwargs.get('param', IG_CURSOR_PARAM)
self.POST_PATH = kwargs.get('user_url', IG_POST_PATH)
self.POST_NODE = kwargs.get('user_node', IG_POST_NODE)
self.CURSOR_NODE = kwargs.get('id_node', IG_CURSOR_NODE)
self.NEXT_NODE = kwargs.get('next_node', IG_NEXT_NODE)
self.NODE = kwargs.get('node', IG_POSTS_NODE)
self.ITEM = kwargs.get('item', IG_ITEM)
self.SHOTCUT = kwargs.get('shortcut', IG_SHORT_CUT)
self.USERNAME = kwargs.get('username', IG_USER_NAME)
self.USERPROFILE = kwargs.get('profile', IG_USER_PROFILE)
if hashtag:
self.start_urls = [self.get_url(hashtag)]
self.max = max_times or 1
self.count = 0
self.close_down = False
def get_item(self, node):
item = dict()
for (k, v) in self.ITEM.items():
item[k] = get_property(node, v)
item['link'] = urlparse.urljoin(
self.SERVER, self.POST_PATH + get_property(node, self.SHOTCUT) + '/')
return ImageItem(item)
def parse(self, response):
data = json.loads(response.text)
next_id = self.get_next_id(data)
has_next = self.has_next_page(data)
self.count = self.count + 1
nodes = get_property(data, self.NODE)
if nodes and len(nodes):
for node in nodes:
node = node.get('node')
item = self.get_item(node)
# get username, if do not need username can directly yield item
request = Request(url=self.get_user_url(
node.get('shortcode')), callback=self.parse_user, errback=self.parse_user)
request.meta['item'] = item
yield request
if next_id and has_next and self.count < self.max:
yield Request(url=self.get_next_url(response.url, next_id), callback=self.parse)
if self.close_down:
raise CloseSpider(reason='Too many duplicate image, maximum number exceeded')
def parse_user(self, response):
item = response.meta['item']
try:
data = json.loads(response.text)
item["username"] = get_property(data, self.USERNAME)
item["profile"] = get_property(data, self.USERPROFILE)
except:
pass
yield ImageItem(**item)
def get_url(self, hashtag):
return urlparse.urljoin(self.SERVER, self.PATH + hashtag + '/' + self.DEFAULT_PARAM)
def get_next_url(self, url, next_id):
return urlparse.urljoin(url.split('?')[0], self.DEFAULT_PARAM + self.PARAM_NAME + next_id)
def get_user_url(self, short_code):
return urlparse.urljoin(self.SERVER, self.POST_PATH + short_code + '/' + self.DEFAULT_PARAM)
def has_next_page(self, data):
return get_property(data, self.NEXT_NODE)
def get_next_id(self, data):
return get_property(data, self.CURSOR_NODE)
def get_property(data, path):
v = data
for p in path:
if isinstance(p, int):
if isinstance(v, list) and len(v) > p:
v = v[p]
else:
v = None
elif isinstance(p, str):
v = v.get(p, None)
if v is None:
break
return v
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment