Created
September 22, 2017 05:27
-
-
Save leferrad/426d80110a16c1de4bcf02b3ada7b1bf to your computer and use it in GitHub Desktop.
Extract images from 9GAG through a simple interactive main in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""Script to get images from 9gag, based on https://github.com/sashgorokhov/python-ninegag""" | |
# Python 2 and 3 compatibility | |
from __future__ import print_function | |
from builtins import input | |
from bs4 import BeautifulSoup | |
import os | |
import re | |
import requests | |
import time | |
try: | |
import urlparse | |
except ImportError: | |
from urllib import parse as urlparse | |
# Constant values | |
LONGPOST_HEIGHT_MIN = 1000 | |
BASE_URL = 'http://9gag.com/' | |
# Cache variables | |
_sections = None | |
_cache = dict() # type: dict[str, tuple[int, dict]] | |
# --- Exception classes --- | |
class NineGagError(Exception): | |
pass | |
class UnknownArticleType(NineGagError): | |
pass | |
class NotSafeForWork(NineGagError): | |
pass | |
# -------------------------- | |
# --- Extraction functions --- | |
def _bs_from_response(html): | |
""" | |
Returns BeautifulSoup from given str with html inside. | |
:param html: str | |
:return: bs4.BeautifulSoup | |
""" | |
return BeautifulSoup(html, "html.parser") | |
def _bs_from_url(url): | |
""" | |
Returns BeautifulSoup from given url. | |
Shortcut function. | |
:param url: str | |
:return: bs4.BeautifulSoup | |
""" | |
return _bs_from_response(requests.get(url).text) | |
def get_sections(): | |
bs = _bs_from_url(BASE_URL) | |
l = bs.find(attrs='nav-menu').find(attrs='primary').find_all('li')[1:-1] | |
l.extend(bs.find_all(attrs="badge-section-menu-items")) | |
return dict((i.a.text.strip(), i.a['href']) for i in l) | |
def _get_gif(container): | |
""" | |
Return dict with key url that will contain url to source gif and type with value "gif". | |
:param bs4.Tag container: | |
:rtype: dict | |
""" | |
tag = container.find(attrs='badge-animated-container-animated') | |
if not tag: | |
return None | |
return {'url': tag['data-image'], 'type': 'gif'} | |
def _get_image(container): | |
""" | |
Return dict with key url that will contain url to source image and type with value image, | |
if source image height is below 1000, or longpost, otherwize. | |
:param bs4.Tag container: | |
:rtype: dict | |
""" | |
tag = container.find(attrs='badge-item-img') | |
if not tag: | |
return None | |
style = container.a['style'] | |
match = re.search(r'[\d\.]+', style) | |
if not match: | |
return | |
height = float(match.group()) | |
type = 'image' | |
if height > LONGPOST_HEIGHT_MIN: | |
type = 'longpost' | |
url = urlparse.urljoin(BASE_URL, container.a['href']) | |
bs = _bs_from_url(url) | |
tag = bs.find(attrs='badge-item-img') | |
return {'url': tag['src'], 'type': type} | |
def _cache_article(func): | |
""" | |
I wanted to use functools.lru_cache here, but it is not compatible with python 2.7 :( | |
:param dict article: | |
:rtype: dict | |
""" | |
def wrapper(article): | |
if article['id'] in _cache: | |
return _cache[article['id']][1] | |
else: | |
data = func(article=article) | |
_cache[article['id']] = (time.time(), data) | |
if len(_cache) > 100: | |
ordered_keys = sorted(_cache.keys(), key=lambda key: _cache[key][0]) | |
list(map(_cache.pop, ordered_keys[:20])) | |
return data | |
return wrapper | |
@_cache_article | |
def _get_data(article): | |
""" | |
Return article data. Returns dict with keys url and type. | |
:param bs4.Tag article: | |
:rtype: dict|None | |
""" | |
container = article.find(attrs='badge-post-container') | |
if container is None: | |
raise NotSafeForWork() | |
return _get_gif(container) or _get_image(container) | |
def _paginated_url(url, max_pages=1): | |
""" | |
:param int|None max_pages: how many pages of results to parse. If None - all available. Default 1 - only first page. | |
:rtype: collections.Iterable[tuple[str, dict]] | |
""" | |
parsed_pages = 0 | |
while max_pages is None or (max_pages and parsed_pages < max_pages): | |
parsed_pages += 1 | |
response = requests.get(url, headers={'Accept': 'application/json', 'X-Requested-With': 'XMLHttpRequest'}) | |
json = response.json() | |
if not len(json['ids']): | |
break | |
url = urlparse.urljoin(BASE_URL, json['loadMoreUrl']) | |
for article_id in json['ids']: | |
yield article_id, json['items'][article_id] | |
def _parse_article(article): | |
""" | |
Return parsed article data. Supported keys: id, url, votes, comments, title, data. | |
:param bs4.Tag article: | |
:rtype: dict|None | |
""" | |
data = dict() | |
data['id'] = article['data-entry-id'] | |
data['url'] = article['data-entry-url'] | |
data['votes'] = article['data-entry-votes'] | |
data['comments'] = article['data-entry-comments'] | |
data['title'] = article.find(attrs='badge-item-title').a.text.strip() | |
try: | |
data['data'] = _get_data(article) | |
except NotSafeForWork: | |
print('[DEBUG] NSFW Post: {} {}'.format(data['id'], data['url'])) | |
return | |
if not data['data']: | |
print('[WARNING] Unknown article type of {}: {}'.format(data['id'], data['url'])) | |
return | |
return data | |
def get_articles(url, max_pages=1, raise_on_error=False): | |
""" | |
Return iterable with all articles found on given url. | |
:param str url: | |
:param int|None max_pages: how many pages of results to parse. If None - all available. Default 1 - only first page. | |
:rtype: collections.Iterable[dict] | |
""" | |
for article_id, article in _paginated_url(url, max_pages=max_pages): | |
try: | |
data = _parse_article(_bs_from_response(article).article) | |
if not data: | |
print("[DEBUG] Empty data for article '%s'" % str(article_id)) | |
continue | |
yield data | |
except Exception as e: | |
print("[ERROR] Error while parsing article '%s': %s" % (str(article_id), str(e))) | |
if raise_on_error: | |
raise | |
# ---------------------------- | |
# ----- Utils ----- | |
def get_filename(n, max_n, preffix='img_', extension='jpg'): | |
assert isinstance(n, int) and isinstance(max_n, int), ValueError("Arguments 'n' and 'max_n' must be integers!") | |
assert n <= max_n, ValueError("Argument 'n' must be less or equal than 'max_n'") | |
n = str(n) | |
max_len = len(str(max_n)) | |
n_len = len(n) | |
suffix = '0'*(max_len-n_len) + n + '.' + extension | |
return preffix+suffix | |
# ======================= | |
# MENUS FUNCTIONS | |
# ======================= | |
# Main menu | |
def main_menu(): | |
os.system('clear') | |
print("Welcome,\n") | |
choice = raw_input(" >> ") | |
exec_menu(choice) | |
return | |
# Execute menu | |
def exec_menu(choice): | |
os.system('clear') | |
ch = choice.lower() | |
if ch == '': | |
menu_actions['main_menu']() | |
elif ch in menu_actions: | |
menu_actions[ch]() | |
else: | |
print("Invalid selection, please try again.\n") | |
menu_actions['main_menu']() | |
return | |
def reload_menu_sections(): | |
invalid_choice = True | |
while invalid_choice: | |
choice = input("> Do you want to process another section? (y/N) ") | |
if choice == '' or choice.lower() == 'n': | |
# Default: no | |
break | |
elif choice.lower() == 'y': | |
# Reload this menu | |
invalid_choice = False | |
menu_sections() | |
else: | |
print('[ERROR] Invalid choice! Try again...') | |
def save_on_disk(r, path): | |
with open(path, 'wb') as f: | |
f.write(r) | |
def get_data_from_url(url): | |
return requests.get(url).content | |
def confirm_directory(directory): | |
confirmed = False | |
if not os.path.exists(directory): | |
create = input("> Directory '%s' does not exist. Do you want to create it? [Y/n]" % directory) | |
if create == '' or create.lower() == 'y': | |
try: | |
os.makedirs(directory) | |
confirmed = True | |
except: | |
confirmed = False | |
else: | |
print("[INFO] Using existing directory '%s'" % directory) | |
confirmed = True | |
return confirmed | |
def menu_save_results(results): | |
directory = input("> Enter an output directory for the results: ") | |
confirmed = confirm_directory(directory) | |
if not confirmed: | |
# Let a default directory | |
directory = '/tmp/imgs' | |
if not os.path.exists(directory): | |
os.mkdir(directory) | |
print("[ERROR] Error trying to create directory! Using default: '%s'" % directory) | |
max_res = len(results) | |
preffix = 'img_' | |
for i, res in enumerate(results): | |
extension = res.split('.')[-1] | |
filename = get_filename(i, max_res, preffix=preffix, extension=extension) | |
fpath = os.path.join(directory, filename) | |
print("[INFO] Saving result %i of %i into '%s'..." % (i+1, max_res, fpath)) | |
save_on_disk(get_data_from_url(res), fpath) | |
def menu_articles(section_url): | |
invalid_choice = True | |
# Default value | |
max_items = 10 | |
while invalid_choice: | |
choice = input("> Enter the max of items to download [default=%i]: " % max_items) | |
if choice == '': | |
# Default value already set | |
break | |
else: | |
try: | |
max_items = int(choice) | |
invalid_choice = False | |
except: | |
print('[ERROR] Invalid choice! Try again...') | |
data = get_articles(section_url, max_pages=None) # Get all available pages and stop when a max of items is achieved | |
result_types = ['gif', 'image', 'gif or image'] | |
print("[INFO] These are the supported types of result which can be downloaded: ") | |
for i, res_t in enumerate(result_types): | |
print(" %i. %s" % (i, res_t)) | |
print("") | |
invalid_choice = True | |
# Default value | |
r_type = 'image' | |
while invalid_choice: | |
choice = input("> Select the type of result desired [default='%s']: " % r_type) | |
if choice == '': | |
# Default value already set | |
break | |
else: | |
try: | |
r_type = result_types[int(choice)] | |
invalid_choice = False | |
except: | |
print('[ERROR] Invalid choice! Try again...') | |
result_data = [] | |
accepted_result = {'gif': lambda d: d['type'] == 'gif', | |
'image': lambda d: d['type'] == 'image', | |
'gif or image': lambda d: d['type'] == 'gif' or d['type'] == 'image'} | |
for dd in data: | |
if len(result_data) >= max_items: | |
break | |
print("[DEBUG] Processing a result...") | |
if accepted_result[r_type](dd['data']): | |
print("[DEBUG] -> Result belongs to the chosen type. Caching it...") | |
# TODO: what if we also extract the title? | |
result_data.append(dd['data']['url']) | |
else: | |
print("[DEBUG] -> Result does not belong to the chosen type. Ignoring it...") | |
print("") | |
print("[INFO] There are %i results extracted." % len(result_data)) | |
menu_save_results(result_data) | |
# Sections menu | |
def menu_sections(): | |
global _sections | |
if _sections is None: | |
_sections = get_sections() | |
print("[INFO] These are the categories available for 9GAG data: ") | |
for i, (sec, url) in enumerate(_sections.items()): | |
print(" %i. %s (url: %s)" % (i, sec, url)) | |
print("") | |
invalid_choice = True | |
while invalid_choice: | |
choice = input("> Choose one of the sections [0-%i], or enter 'q' to quit: " % (len(_sections)-1)) | |
if choice.lower() == 'q': | |
print("[INFO] Exiting...") | |
exit(0) | |
try: | |
c = int(choice) | |
section_name = _sections.keys()[c] | |
section_url = _sections[section_name] | |
invalid_choice = False | |
except: | |
print('[ERROR] Invalid choice! Try again...') | |
else: | |
print("[INFO] Processing section '%s'... \n" % section_name) | |
menu_articles(section_url) | |
print("") | |
reload_menu_sections() | |
# Menu definition | |
menu_actions = { | |
'main_menu': main_menu, | |
#'1': menu1, | |
#'2': menu2, | |
#'9': back, | |
#'0': exit, | |
} | |
if __name__ == '__main__': | |
menu_sections() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment