Skip to content

Instantly share code, notes, and snippets.

@EngineerCoding
Created December 17, 2017 14:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save EngineerCoding/f68ecd48f4b2978cfa3bf08f31cd2582 to your computer and use it in GitHub Desktop.
Save EngineerCoding/f68ecd48f4b2978cfa3bf08f31cd2582 to your computer and use it in GitHub Desktop.
from tag_filter import TagFilter
import requests
import os
import utils
class DataProvider(object):
def __init__(self, *args, tag_filter=None, **kwargs):
super().__init__(*args, **kwargs)
self.tag_filter = tag_filter
def prepare_providing(self, full_scrape=False):
pass
def get_name_provider(self):
return self.__class__.__name__
def write_raw_html(self, in_dir):
raise NotImplementedError
class Jinja2Mixin(object):
_LOADER_CACHE = dict()
def __init__(self, *args, style_file=None, **kwargs):
super().__init__(*args, **kwargs)
self.template = self._load_jinja_template()
self.style_file = None
if style_file:
if not os.path.isfile(style_file):
raise FileNotFoundError('Style file \'{}\' does not exist!'
.format(style))
self.style_file = style_file
def _load_jinja_template(self):
dir_path = self._get_template_dir_path()
if dir_path not in Jinja2Mixin._LOADER_CACHE:
from jinja2 import Environment, FileSystemLoader
Jinja2Mixin._LOADER_CACHE[dir_path] = Environment(
loader=FileSystemLoader(dir_path), trim_blocks=True)
return Jinja2Mixin._LOADER_CACHE[dir_path].get_template(
os.path.basename(self.JINJA_TEMPLATE))
def _get_template_dir_path(self):
if not hasattr(self, 'JINJA_TEMPLATE'):
raise AttributeError('JINJA_TEMPLATE is not set on class')
template = self.JINJA_TEMPLATE
if not isinstance(template, str):
raise ValueError('JINJA_TEMPLATE is not a string')
return os.path.dirname(os.path.abspath(template))
def article_data_provider(self):
raise NotImplementedError
def write_raw_html(self, dir):
template = self._load_jinja_template()
for file, data in self.article_data_provider():
data['stylesheet'] = self.style_file
template.stream(**data).dump(os.path.join(dir, file))
class DevToProvider(Jinja2Mixin, DataProvider):
OVERVIEW_PAGE_URL = 'https://dev.to/api/articles?page={}'
ARTICLE_URL = 'https://dev.to/api/articles/{}'
USER_URL = 'https://dev.to/{}'
TAG_URL = 'https://dev.to/t/{}'
JINJA_TEMPLATE = 'templates/dev.to.html.j2'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.urls = []
def get_name_provider(self):
return "dev.to"
@staticmethod
def get_file_name(article_json):
file_name = article_json['title'] + ".html"
if article_json['user']['name']:
file_name = article_json['user']['name'] + " - " + file_name
return utils.safe_file_name(file_name)
@staticmethod
def convert_data(web_json):
article_data = {
'cover_image': web_json['cover_image'],
'title': web_json['title'],
'published_date': web_json['published_at'],
'article': web_json['body_html'],
'tags': [],
'user': {
'name': web_json['user']['name'],
'url': DevToProvider.USER_URL.format(
web_json['user']['username'])
}
}
for tag in web_json['tag_list']:
tag_url = DevToProvider.TAG_URL.format(tag)
tag_data = dict(label=tag, url=tag_url)
article_data['tags'].append(tag_data)
return article_data
def prepare_overview(self, page):
overview = requests.get(
DevToProvider.OVERVIEW_PAGE_URL.format(page)).json()
amount_added = 0
for article in overview:
file_name = DevToProvider.get_file_name(article)
if not os.path.isfile(os.path.join('raw/dev.to', file_name)):
id = article['id']
url = DevToProvider.ARTICLE_URL.format(id)
self.urls.append(url)
amount_added += 1
return len(overview), amount_added
def prepare_providing(self, full_scrape=False):
page = 1
length, added = self.prepare_overview(page)
while length != 0 and (full_scrape or added == length):
page += 1
length, added = self.prepare_overview(page)
def article_data_provider(self):
for url in self.urls:
article = requests.get(url).json()
if not self.tag_filter or (self.tag_filter and
self.tag_filter.check_tags(article['tag_list'])):
article_data = DevToProvider.convert_data(article)
file_name = DevToProvider.get_file_name(article_data)
yield file_name, article_data
class ArticleProcessor(object):
def process_article_data(article_data):
raise NotImplementedError
class PDFArticleProcessor(ArticleProcessor):
from jinja2 import Environment, FileSystemLoader
import pdfkit
from time import sleep
import os
class Scraper(object):
def __init__(self, tag_filter=None):
self.tag_filter = tag_filter
self.data_providers = []
self.dirs = ["raw", "article"]
Scraper.create_dirs(self.dirs)
@staticmethod
def create_dirs(dirs):
for dir in dirs:
if not os.path.isdir(dir):
os.mkdir(dir)
def add_data_provider(self, cls):
ins = cls(tag_filter=self.tag_filter)
dirs = map(
lambda d: os.path.join(d, ins.get_name_provider()), self.dirs)
Scraper.create_dirs(dirs)
self.data_providers.append(ins)
def retrieve_data(self, full_scrape=False):
for instance in self.data_providers:
instance.prepare_providing(full_scrape)
base_path = os.path.join("raw", instance.get_name_provider())
instance.write_raw_html(base_path)
def timed_loop(self, timeout=3600, n=-1, ):
if n < 0:
while True:
self.retrieve_data()
sleep(timeout)
elif n > 0:
for i in range(n):
self.retrieve_data()
if i != n - 1:
sleep(timeout)
# def main_loop(self):
# for instance, template in self.data_providers:
# instance.prepare_providing()
# for article in instance.article_iterator():
# if self.style_file:
# article["stylesheet"] = self.style_file
# safe_file_name = _safe_file_name(article["file_name"])
# in_file_path = os.path.join(
# "raw", safe_file_name + ".html")
# template.stream(**article).dump(in_file_path)
# out_file_path = os.path.join(
# "articles", safe_file_name + ".pdf")
# options = {"quiet": ""}
# pdfkit.from_file(in_file_path, out_file_path, options)
if __name__ == "__main__":
from argparse import ArgumentParser
from data_provider import DevToProvider
scraper = Scraper()
scraper.add_data_provider(DevToProvider)
scraper.retrieve_data(True)
sleep(3600)
scraper.timed_loop()
class TagFilter(object):
def __init__(self, required_tags=[], blacklisted_tags=[]):
self.required_tags = set(required_tags)
self.blacklisted_tags = set(blacklisted_tags)
def check_tags(self, tags):
return (self.required_tags.issubset(tags) and not
bool(len(self.blacklisted_tags.intersection(tags))))
from sys import platform
from os import sep
def safe_file_name(file_name):
# Really a windows quirk
new_name = file_name
if platform == 'win32':
trans_dict = {'<': '&lt;', '>': '&gt;', ':': ';', '|': ' pipe ',
'/': ' forward slash ', '?': ' question mark ',
'*': ' asterisk ', '"': '\''}
new_name = ''
for char in file_name:
if char in trans_dict:
new_name += trans_dict[char]
else:
new_name += char
return new_name.replace(sep, '-')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment