Skip to content

Instantly share code, notes, and snippets.

@leferrad
Created September 22, 2017 05:27
Show Gist options
  • Save leferrad/426d80110a16c1de4bcf02b3ada7b1bf to your computer and use it in GitHub Desktop.
Save leferrad/426d80110a16c1de4bcf02b3ada7b1bf to your computer and use it in GitHub Desktop.
Extract images from 9GAG through a simple interactive main in Python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Script to get images from 9gag, based on https://github.com/sashgorokhov/python-ninegag"""
# Python 2 and 3 compatibility
from __future__ import print_function
from builtins import input
from bs4 import BeautifulSoup
import os
import re
import requests
import time
try:
import urlparse
except ImportError:
from urllib import parse as urlparse
# Constant values
LONGPOST_HEIGHT_MIN = 1000
BASE_URL = 'http://9gag.com/'
# Cache variables
_sections = None
_cache = dict() # type: dict[str, tuple[int, dict]]
# --- Exception classes ---
class NineGagError(Exception):
pass
class UnknownArticleType(NineGagError):
pass
class NotSafeForWork(NineGagError):
pass
# --------------------------
# --- Extraction functions ---
def _bs_from_response(html):
"""
Returns BeautifulSoup from given str with html inside.
:param html: str
:return: bs4.BeautifulSoup
"""
return BeautifulSoup(html, "html.parser")
def _bs_from_url(url):
"""
Returns BeautifulSoup from given url.
Shortcut function.
:param url: str
:return: bs4.BeautifulSoup
"""
return _bs_from_response(requests.get(url).text)
def get_sections():
bs = _bs_from_url(BASE_URL)
l = bs.find(attrs='nav-menu').find(attrs='primary').find_all('li')[1:-1]
l.extend(bs.find_all(attrs="badge-section-menu-items"))
return dict((i.a.text.strip(), i.a['href']) for i in l)
def _get_gif(container):
"""
Return dict with key url that will contain url to source gif and type with value "gif".
:param bs4.Tag container:
:rtype: dict
"""
tag = container.find(attrs='badge-animated-container-animated')
if not tag:
return None
return {'url': tag['data-image'], 'type': 'gif'}
def _get_image(container):
"""
Return dict with key url that will contain url to source image and type with value image,
if source image height is below 1000, or longpost, otherwize.
:param bs4.Tag container:
:rtype: dict
"""
tag = container.find(attrs='badge-item-img')
if not tag:
return None
style = container.a['style']
match = re.search(r'[\d\.]+', style)
if not match:
return
height = float(match.group())
type = 'image'
if height > LONGPOST_HEIGHT_MIN:
type = 'longpost'
url = urlparse.urljoin(BASE_URL, container.a['href'])
bs = _bs_from_url(url)
tag = bs.find(attrs='badge-item-img')
return {'url': tag['src'], 'type': type}
def _cache_article(func):
"""
I wanted to use functools.lru_cache here, but it is not compatible with python 2.7 :(
:param dict article:
:rtype: dict
"""
def wrapper(article):
if article['id'] in _cache:
return _cache[article['id']][1]
else:
data = func(article=article)
_cache[article['id']] = (time.time(), data)
if len(_cache) > 100:
ordered_keys = sorted(_cache.keys(), key=lambda key: _cache[key][0])
list(map(_cache.pop, ordered_keys[:20]))
return data
return wrapper
@_cache_article
def _get_data(article):
"""
Return article data. Returns dict with keys url and type.
:param bs4.Tag article:
:rtype: dict|None
"""
container = article.find(attrs='badge-post-container')
if container is None:
raise NotSafeForWork()
return _get_gif(container) or _get_image(container)
def _paginated_url(url, max_pages=1):
"""
:param int|None max_pages: how many pages of results to parse. If None - all available. Default 1 - only first page.
:rtype: collections.Iterable[tuple[str, dict]]
"""
parsed_pages = 0
while max_pages is None or (max_pages and parsed_pages < max_pages):
parsed_pages += 1
response = requests.get(url, headers={'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest'})
json = response.json()
if not len(json['ids']):
break
url = urlparse.urljoin(BASE_URL, json['loadMoreUrl'])
for article_id in json['ids']:
yield article_id, json['items'][article_id]
def _parse_article(article):
"""
Return parsed article data. Supported keys: id, url, votes, comments, title, data.
:param bs4.Tag article:
:rtype: dict|None
"""
data = dict()
data['id'] = article['data-entry-id']
data['url'] = article['data-entry-url']
data['votes'] = article['data-entry-votes']
data['comments'] = article['data-entry-comments']
data['title'] = article.find(attrs='badge-item-title').a.text.strip()
try:
data['data'] = _get_data(article)
except NotSafeForWork:
print('[DEBUG] NSFW Post: {} {}'.format(data['id'], data['url']))
return
if not data['data']:
print('[WARNING] Unknown article type of {}: {}'.format(data['id'], data['url']))
return
return data
def get_articles(url, max_pages=1, raise_on_error=False):
"""
Return iterable with all articles found on given url.
:param str url:
:param int|None max_pages: how many pages of results to parse. If None - all available. Default 1 - only first page.
:rtype: collections.Iterable[dict]
"""
for article_id, article in _paginated_url(url, max_pages=max_pages):
try:
data = _parse_article(_bs_from_response(article).article)
if not data:
print("[DEBUG] Empty data for article '%s'" % str(article_id))
continue
yield data
except Exception as e:
print("[ERROR] Error while parsing article '%s': %s" % (str(article_id), str(e)))
if raise_on_error:
raise
# ----------------------------
# ----- Utils -----
def get_filename(n, max_n, preffix='img_', extension='jpg'):
assert isinstance(n, int) and isinstance(max_n, int), ValueError("Arguments 'n' and 'max_n' must be integers!")
assert n <= max_n, ValueError("Argument 'n' must be less or equal than 'max_n'")
n = str(n)
max_len = len(str(max_n))
n_len = len(n)
suffix = '0'*(max_len-n_len) + n + '.' + extension
return preffix+suffix
# =======================
# MENUS FUNCTIONS
# =======================
# Main menu
def main_menu():
os.system('clear')
print("Welcome,\n")
choice = raw_input(" >> ")
exec_menu(choice)
return
# Execute menu
def exec_menu(choice):
os.system('clear')
ch = choice.lower()
if ch == '':
menu_actions['main_menu']()
elif ch in menu_actions:
menu_actions[ch]()
else:
print("Invalid selection, please try again.\n")
menu_actions['main_menu']()
return
def reload_menu_sections():
invalid_choice = True
while invalid_choice:
choice = input("> Do you want to process another section? (y/N) ")
if choice == '' or choice.lower() == 'n':
# Default: no
break
elif choice.lower() == 'y':
# Reload this menu
invalid_choice = False
menu_sections()
else:
print('[ERROR] Invalid choice! Try again...')
def save_on_disk(r, path):
with open(path, 'wb') as f:
f.write(r)
def get_data_from_url(url):
return requests.get(url).content
def confirm_directory(directory):
confirmed = False
if not os.path.exists(directory):
create = input("> Directory '%s' does not exist. Do you want to create it? [Y/n]" % directory)
if create == '' or create.lower() == 'y':
try:
os.makedirs(directory)
confirmed = True
except:
confirmed = False
else:
print("[INFO] Using existing directory '%s'" % directory)
confirmed = True
return confirmed
def menu_save_results(results):
directory = input("> Enter an output directory for the results: ")
confirmed = confirm_directory(directory)
if not confirmed:
# Let a default directory
directory = '/tmp/imgs'
if not os.path.exists(directory):
os.mkdir(directory)
print("[ERROR] Error trying to create directory! Using default: '%s'" % directory)
max_res = len(results)
preffix = 'img_'
for i, res in enumerate(results):
extension = res.split('.')[-1]
filename = get_filename(i, max_res, preffix=preffix, extension=extension)
fpath = os.path.join(directory, filename)
print("[INFO] Saving result %i of %i into '%s'..." % (i+1, max_res, fpath))
save_on_disk(get_data_from_url(res), fpath)
def menu_articles(section_url):
invalid_choice = True
# Default value
max_items = 10
while invalid_choice:
choice = input("> Enter the max of items to download [default=%i]: " % max_items)
if choice == '':
# Default value already set
break
else:
try:
max_items = int(choice)
invalid_choice = False
except:
print('[ERROR] Invalid choice! Try again...')
data = get_articles(section_url, max_pages=None) # Get all available pages and stop when a max of items is achieved
result_types = ['gif', 'image', 'gif or image']
print("[INFO] These are the supported types of result which can be downloaded: ")
for i, res_t in enumerate(result_types):
print(" %i. %s" % (i, res_t))
print("")
invalid_choice = True
# Default value
r_type = 'image'
while invalid_choice:
choice = input("> Select the type of result desired [default='%s']: " % r_type)
if choice == '':
# Default value already set
break
else:
try:
r_type = result_types[int(choice)]
invalid_choice = False
except:
print('[ERROR] Invalid choice! Try again...')
result_data = []
accepted_result = {'gif': lambda d: d['type'] == 'gif',
'image': lambda d: d['type'] == 'image',
'gif or image': lambda d: d['type'] == 'gif' or d['type'] == 'image'}
for dd in data:
if len(result_data) >= max_items:
break
print("[DEBUG] Processing a result...")
if accepted_result[r_type](dd['data']):
print("[DEBUG] -> Result belongs to the chosen type. Caching it...")
# TODO: what if we also extract the title?
result_data.append(dd['data']['url'])
else:
print("[DEBUG] -> Result does not belong to the chosen type. Ignoring it...")
print("")
print("[INFO] There are %i results extracted." % len(result_data))
menu_save_results(result_data)
# Sections menu
def menu_sections():
global _sections
if _sections is None:
_sections = get_sections()
print("[INFO] These are the categories available for 9GAG data: ")
for i, (sec, url) in enumerate(_sections.items()):
print(" %i. %s (url: %s)" % (i, sec, url))
print("")
invalid_choice = True
while invalid_choice:
choice = input("> Choose one of the sections [0-%i], or enter 'q' to quit: " % (len(_sections)-1))
if choice.lower() == 'q':
print("[INFO] Exiting...")
exit(0)
try:
c = int(choice)
section_name = _sections.keys()[c]
section_url = _sections[section_name]
invalid_choice = False
except:
print('[ERROR] Invalid choice! Try again...')
else:
print("[INFO] Processing section '%s'... \n" % section_name)
menu_articles(section_url)
print("")
reload_menu_sections()
# Menu definition
menu_actions = {
'main_menu': main_menu,
#'1': menu1,
#'2': menu2,
#'9': back,
#'0': exit,
}
if __name__ == '__main__':
menu_sections()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment