Skip to content

Instantly share code, notes, and snippets.

@uluQulu
Last active February 5, 2019 00:11
Show Gist options
  • Save uluQulu/cc2cf48522c931113117009cb720842b to your computer and use it in GitHub Desktop.
Save uluQulu/cc2cf48522c931113117009cb720842b to your computer and use it in GitHub Desktop.
Like util for fellowes
import re
import random
"""Module that handles the like features"""
from math import ceil
from re import findall
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import WebDriverException
from .time_util import sleep
from .util import update_activity
from .util import add_user_to_blacklist
from .util import click_element
from .util import formatNumber
def get_links_from_feed(browser, amount, num_of_search, logger):
"""Fetches random number of links from feed and returns a list of links"""
browser.get('https://www.instagram.com')
# update server calls
update_activity()
sleep(2)
for i in range(num_of_search + 1):
browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);")
sleep(2)
# get links
link_elems = browser.find_elements_by_xpath(
"//article/div[2]/div[2]/a")
total_links = len(link_elems)
logger.info("Total of links feched for analysis: {}".format(total_links))
links = []
try:
if link_elems:
links = [link_elem.get_attribute('href') for link_elem in link_elems]
logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
for i, link in enumerate(links):
print(i, link)
logger.info("~~~~~~~~~~~~~~~~~~~~~~~~~~~")
except BaseException as e:
logger.error("link_elems error {}".format(str(e)))
return links
def get_links_for_location(browser,
location,
amount,
logger,
media=None,
skip_top_posts=True):
"""Fetches the number of links specified
by amount and returns a list of links"""
if media is None:
# All known media types
media = ['', 'Post', 'Video']
elif media == 'Photo':
# Include posts with multiple images in it
media = ['', 'Post']
else:
# Make it an array to use it in the following part
media = [media]
browser.get('https://www.instagram.com/explore/locations/' + location)
# update server calls
update_activity()
sleep(2)
# clicking load more
body_elem = browser.find_element_by_tag_name('body')
sleep(2)
abort = True
try:
load_button = body_elem.find_element_by_xpath(
'//a[contains(@class, "_1cr2e _epyes")]')
except:
try:
# scroll down to load posts
for i in range(int(ceil(amount/12))):
browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);")
sleep(2)
except:
logger.warning(
'Load button not found, working with current images!')
else:
abort = False
body_elem.send_keys(Keys.END)
sleep(2)
# update server calls
update_activity()
else:
abort = False
body_elem.send_keys(Keys.END)
sleep(2)
click_element(browser, load_button) # load_button.click()
# update server calls
update_activity()
body_elem.send_keys(Keys.HOME)
sleep(1)
# Get links
if skip_top_posts:
main_elem = browser.find_element_by_xpath('//main/article/div[2]')
else:
main_elem = browser.find_element_by_tag_name('main')
link_elems = main_elem.find_elements_by_tag_name('a')
total_links = len(link_elems)
links = [link_elem.get_attribute('href') for link_elem in link_elems
if link_elem.text in media]
filtered_links = len(links)
while (filtered_links < amount) and not abort:
amount_left = amount - filtered_links
# Average items of the right media per page loaded
new_per_page = ceil(12 * filtered_links / total_links)
if new_per_page == 0:
# Avoid division by zero
new_per_page = 1. / 12.
# Number of page load needed
new_needed = int(ceil(amount_left / new_per_page))
if new_needed > 12:
# Don't go bananas trying to get all of instagram!
new_needed = 12
for i in range(new_needed): # add images x * 12
# Keep the latest window active while loading more posts
before_load = total_links
body_elem.send_keys(Keys.END)
# update server calls
update_activity()
sleep(1)
body_elem.send_keys(Keys.HOME)
sleep(1)
link_elems = main_elem.find_elements_by_tag_name('a')
total_links = len(link_elems)
abort = (before_load == total_links)
if abort:
break
links = [link_elem.get_attribute('href') for link_elem in link_elems
if link_elem.text in media]
filtered_links = len(links)
return links[:amount]
def get_links_for_tag(browser,
tag,
amount,
logger,
media=None,
skip_top_posts=True):
"""Fetches the number of links specified
by amount and returns a list of links"""
if media is None:
# All known media types
media = ['', 'Post', 'Video']
elif media == 'Photo':
# Include posts with multiple images in it
media = ['', 'Post']
else:
# Make it an array to use it in the following part
media = [media]
browser.get('https://www.instagram.com/explore/tags/'
+ (tag[1:] if tag[:1] == '#' else tag))
# update server calls
update_activity()
sleep(2)
top_elements = browser.find_element_by_xpath('//main/article/div[1]')
top_posts = top_elements.find_elements_by_tag_name('a')
sleep(1)
if skip_top_posts:
main_elem = browser.find_element_by_xpath('//main/article/div[2]')
else:
main_elem = browser.find_element_by_tag_name('main')
link_elems = main_elem.find_elements_by_tag_name('a')
sleep(1)
if not link_elems: #this tag does not have `Top Posts` or it really is empty..
main_elem = browser.find_element_by_xpath('//main/article/div[1]')
top_posts = []
sleep(2)
possible_posts = formatNumber(browser.find_element_by_xpath(
"//span[contains(@class, '_fd86t')]").text)
logger.info("desired amount: {} | top posts [{}]: {} | possible posts: {}".format(amount,
('enabled' if not skip_top_posts else 'disabled'), len(top_posts), possible_posts))
possible_posts = possible_posts if not skip_top_posts else possible_posts-len(top_posts)
amount = possible_posts if amount > possible_posts else amount
#sometimes pages do not have the correct amount of posts as it is written there, it may be cos of some posts is deleted but still keeps counted for the tag
#Get links
links = get_links(browser, tag, logger, media, main_elem)
filtered_links = len(links)
try_again = 0
sc_rolled = 0
nap = 1.5
while filtered_links in range(1, amount):
if sc_rolled > 100:
logger.info("Scrolled too much! ~ sleeping a bit :>")
sleep(600)
sc_rolled = 0
for i in range(3):
browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);")
sc_rolled += 1
update_activity()
sleep(nap) #if not slept, and internet speed is low, instagram will only scroll one time, instead of many times you sent scoll command...
sleep(3)
links.extend(get_links(browser, tag, logger, media, main_elem))
links_all = links #uniqify links while preserving order
s = set()
links = []
for i in links_all:
if i not in s:
s.add(i)
links.append(i)
if len(links) == filtered_links:
try_again += 1
nap = 3 if try_again==1 else 5
logger.info("Insufficient amount of links ~ trying again: {}".format(try_again))
sleep(3)
if try_again > 2: #you can try again as much as you want by changing this number
logger.info("\n'{}' tag POSSIBLY has less images than desired...".format(tag[1:] if tag[:1] == '#' else tag))
break
else:
filtered_links = len(links)
try_again = 0
nap = 1.5
sleep(4)
return links[:amount]
def get_links_for_username(browser,
username,
amount,
logger,
randomize=False,
media=None):
"""Fetches the number of links specified
by amount and returns a list of links"""
if media is None:
# All known media types
media = ['', 'Post', 'Video']
elif media == 'Photo':
# Include posts with multiple images in it
media = ['', 'Post']
else:
# Make it an array to use it in the following part
media = [media]
logger.info('Getting {} image list...'.format(username))
# Get user profile page
browser.get('https://www.instagram.com/' + username)
# update server calls
update_activity()
body_elem = browser.find_element_by_tag_name('body')
try:
is_private = body_elem.find_element_by_xpath(
'//h2[@class="_kcrwx"]')
except:
logger.info('Interaction begin...')
else:
if is_private:
logger.warning('This user is private...')
return False
abort = True
try:
load_button = body_elem.find_element_by_xpath(
'//a[contains(@class, "_1cr2e _epyes")]')
except:
try:
# scroll down to load posts
for i in range(int(ceil(amount/12))):
browser.execute_script(
"window.scrollTo(0, document.body.scrollHeight);")
sleep(2)
except:
logger.warning(
'Load button not found, working with current images!')
else:
abort = False
body_elem.send_keys(Keys.END)
sleep(2)
# update server calls
update_activity()
else:
abort = False
body_elem.send_keys(Keys.END)
sleep(2)
click_element(browser, load_button) # load_button.click()
# update server calls
update_activity()
body_elem.send_keys(Keys.HOME)
sleep(2)
# Get Links
main_elem = browser.find_element_by_tag_name('main')
link_elems = main_elem.find_elements_by_tag_name('a')
total_links = len(link_elems)
links = []
filtered_links = 0
try:
if link_elems:
links = [link_elem.get_attribute('href') for link_elem in link_elems
if link_elem and link_elem.text in media]
filtered_links = len(links)
except BaseException as e:
logger.error("link_elems error {}}".format(str(e)))
if randomize:
# Expanding the pooulation for better random distribution
amount = amount * 5
while (filtered_links < amount) and not abort:
amount_left = amount - filtered_links
# Average items of the right media per page loaded
new_per_page = ceil(12 * filtered_links / total_links)
if new_per_page == 0:
# Avoid division by zero
new_per_page = 1. / 12.
# Number of page load needed
new_needed = int(ceil(amount_left / new_per_page))
if new_needed > 12:
# Don't go bananas trying to get all of instagram!
new_needed = 12
for i in range(new_needed): # add images x * 12
# Keep the latest window active while loading more posts
before_load = total_links
body_elem.send_keys(Keys.END)
# update server calls
update_activity()
sleep(1)
body_elem.send_keys(Keys.HOME)
sleep(1)
link_elems = main_elem.find_elements_by_tag_name('a')
total_links = len(link_elems)
abort = (before_load == total_links)
if abort:
break
links = [link_elem.get_attribute('href') for link_elem in link_elems
if link_elem.text in media]
filtered_links = len(links)
if randomize:
# Shuffle the population index
links = random.sample(links, filtered_links)
return links[:amount]
def check_link(browser,
link,
dont_like,
ignore_if_contains,
ignore_users,
username,
like_by_followers_upper_limit,
like_by_followers_lower_limit,
logger):
browser.get(link)
# update server calls
update_activity()
sleep(2)
"""Check if the Post is Valid/Exists"""
try:
post_page = browser.execute_script(
"return window._sharedData.entry_data.PostPage")
except WebDriverException: #selenium Exception
try:
#refresh page (you would refresh twice (or more), too)
#browser.get(link) #method 1, when page is not loaded properly, it is not expected to reload. must be navigated to first
browser.execute_script("location.reload()") #mehod 2, page loaded properly, can be reloaded
post_page = browser.execute_script(
"return window._sharedData.entry_data.PostPage")
except WebDriverException:
post_page = None
if post_page is None:
logger.warning('Unavailable Page: {}'.format(link.encode('utf-8')))
return True, None, None, 'Unavailable Page'
"""Gets the description of the link and checks for the dont_like tags"""
graphql = 'graphql' in post_page[0]
if graphql:
media = post_page[0]['graphql']['shortcode_media']
is_video = media['is_video']
user_name = media['owner']['username']
image_text = media['edge_media_to_caption']['edges']
image_text = image_text[0]['node']['text'] if image_text else None
owner_comments = browser.execute_script('''
latest_comments = window._sharedData.entry_data.PostPage[0].graphql.shortcode_media.edge_media_to_comment.edges;
if (latest_comments === undefined) latest_comments = Array();
owner_comments = latest_comments
.filter(item => item.node.owner.username == '{}')
.map(item => item.node.text)
.reduce((item, total) => item + '\\n' + total, '');
return owner_comments;
'''.format(user_name))
else:
media = post_page[0]['media']
is_video = media['is_video']
user_name = media['owner']['username']
image_text = media['caption']
owner_comments = browser.execute_script('''
latest_comments = window._sharedData.entry_data.PostPage[0].media.comments.nodes;
if (latest_comments === undefined) latest_comments = Array();
owner_comments = latest_comments
.filter(item => item.user.username == '{}')
.map(item => item.text)
.reduce((item, total) => item + '\\n' + total, '');
return owner_comments;
'''.format(user_name))
if owner_comments == '':
owner_comments = None
"""Append owner comments to description as it might contain further tags"""
if image_text is None:
image_text = owner_comments
elif owner_comments:
image_text = image_text + '\n' + owner_comments
"""If the image still has no description gets the first comment"""
if image_text is None:
if graphql:
image_text = media['edge_media_to_comment']['edges']
image_text = image_text[0]['node']['text'] if image_text else None
else:
image_text = media['comments']['nodes']
image_text = image_text[0]['text'] if image_text else None
if image_text is None:
image_text = "No description"
logger.info('Image from: {}'.format(user_name.encode('utf-8')))
"""Find the number of followes the user has"""
if like_by_followers_upper_limit or like_by_followers_lower_limit:
userlink = 'https://www.instagram.com/' + user_name
browser.get(userlink)
# update server calls
update_activity()
sleep(1)
try:
num_followers = browser.execute_script(
"return window._sharedData.entry_data."
"ProfilePage[0].user.followed_by.count")
except WebDriverException:
try:
browser.execute_script("location.reload()")
num_followers = browser.execute_script(
"return window._sharedData.entry_data."
"ProfilePage[0].user.followed_by.count")
except WebDriverException:
num_followers = 'undefined'
like_by_followers_lower_limit = None
like_by_followers_upper_limit = None
browser.get(link)
# update server calls
update_activity()
sleep(1)
logger.info('Number of Followers: {}'.format(num_followers))
if like_by_followers_upper_limit and \
num_followers > like_by_followers_upper_limit:
return True, user_name, is_video, \
'Number of followers exceeds limit'
if like_by_followers_lower_limit and \
num_followers < like_by_followers_lower_limit:
return True, user_name, is_video, \
'Number of followers does not reach minimum'
logger.info('Link: {}'.format(link.encode('utf-8')))
logger.info('Description: {}'.format(image_text.encode('utf-8')))
"""Check if the user_name is in the ignore_users list"""
if (user_name in ignore_users) or (user_name == username):
return True, user_name, is_video, 'Username'
if any((word in image_text for word in ignore_if_contains)):
return True, user_name, is_video, 'None'
dont_like_regex = []
for dont_likes in dont_like:
if dont_likes.startswith("#"):
dont_like_regex.append(dont_likes + "([^\d\w]|$)")
elif dont_likes.startswith("["):
dont_like_regex.append("#" + dont_likes[1:] + "[\d\w]+([^\d\w]|$)")
elif dont_likes.startswith("]"):
dont_like_regex.append("#[\d\w]+" + dont_likes[1:] + "([^\d\w]|$)")
else:
dont_like_regex.append(
"#[\d\w]*" + dont_likes + "[\d\w]*([^\d\w]|$)")
for dont_likes_regex in dont_like_regex:
quash = re.search(dont_likes_regex, image_text, re.IGNORECASE)
if quash:
quashed = (quash.group(0)).split('#')[1]
iffy = (re.split(r'\W+', dont_likes_regex))[3]
inapp_unit = ('Inappropriate! ~ contains \'{}\''.format(quashed) if quashed == iffy else
'Inappropriate! ~ contains \'{}\' in \'{}\''.format(iffy, quashed))
return True, user_name, is_video, inapp_unit
return False, user_name, is_video, 'None'
def like_image(browser, username, blacklist, logger, logfolder):
"""Likes the browser opened image"""
like_elem = browser.find_elements_by_xpath(
"//a[@role='button']/span[text()='Like']/..")
liked_elem = browser.find_elements_by_xpath(
"//a[@role='button']/span[text()='Unlike']")
if len(like_elem) == 1:
# sleep real quick right before clicking the element
sleep(2)
click_element(browser, like_elem[0])
logger.info('--> Image Liked!')
update_activity('likes')
if blacklist['enabled'] is True:
action = 'liked'
add_user_to_blacklist(
browser, username, blacklist['campaign'], action, logger
)
sleep(2)
return True
elif len(liked_elem) == 1:
logger.info('--> Already Liked!')
return False
else:
logger.info('--> Invalid Like Element!')
return False
def get_tags(browser, url):
"""Gets all the tags of the given description in the url"""
browser.get(url)
# update server calls
update_activity()
sleep(1)
graphql = browser.execute_script(
"return ('graphql' in window._sharedData.entry_data.PostPage[0])")
if graphql:
image_text = browser.execute_script(
"return window._sharedData.entry_data.PostPage[0].graphql."
"shortcode_media.edge_media_to_caption.edges[0].node.text")
else:
image_text = browser.execute_script(
"return window._sharedData.entry_data."
"PostPage[0].media.caption.text")
tags = findall(r'#\w*', image_text)
return tags
def get_links(browser, tag, logger, media, element):
# Get image links in scope from tags
link_elems = element.find_elements_by_tag_name('a')
sleep(2)
links = []
try:
if link_elems:
new_links = [link_elem.get_attribute('href') for link_elem in link_elems
if link_elem and link_elem.text in media]
links.extend(new_links)
else:
logger.info("'{}' tag does not contain a picture".format(tag[1:] if tag[:1] == '#' else tag))
except BaseException as e:
logger.error("link_elems error {}".format(str(e)))
return links
@uluQulu
Copy link
Author

uluQulu commented Feb 18, 2018

Click RAW above to copy lines

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment