-
-
Save EngineerCoding/f68ecd48f4b2978cfa3bf08f31cd2582 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tag_filter import TagFilter | |
import requests | |
import os | |
import utils | |
class DataProvider(object): | |
def __init__(self, *args, tag_filter=None, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.tag_filter = tag_filter | |
def prepare_providing(self, full_scrape=False): | |
pass | |
def get_name_provider(self): | |
return self.__class__.__name__ | |
def write_raw_html(self, in_dir): | |
raise NotImplementedError | |
class Jinja2Mixin(object): | |
_LOADER_CACHE = dict() | |
def __init__(self, *args, style_file=None, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.template = self._load_jinja_template() | |
self.style_file = None | |
if style_file: | |
if not os.path.isfile(style_file): | |
raise FileNotFoundError('Style file \'{}\' does not exist!' | |
.format(style)) | |
self.style_file = style_file | |
def _load_jinja_template(self): | |
dir_path = self._get_template_dir_path() | |
if dir_path not in Jinja2Mixin._LOADER_CACHE: | |
from jinja2 import Environment, FileSystemLoader | |
Jinja2Mixin._LOADER_CACHE[dir_path] = Environment( | |
loader=FileSystemLoader(dir_path), trim_blocks=True) | |
return Jinja2Mixin._LOADER_CACHE[dir_path].get_template( | |
os.path.basename(self.JINJA_TEMPLATE)) | |
def _get_template_dir_path(self): | |
if not hasattr(self, 'JINJA_TEMPLATE'): | |
raise AttributeError('JINJA_TEMPLATE is not set on class') | |
template = self.JINJA_TEMPLATE | |
if not isinstance(template, str): | |
raise ValueError('JINJA_TEMPLATE is not a string') | |
return os.path.dirname(os.path.abspath(template)) | |
def article_data_provider(self): | |
raise NotImplementedError | |
def write_raw_html(self, dir): | |
template = self._load_jinja_template() | |
for file, data in self.article_data_provider(): | |
data['stylesheet'] = self.style_file | |
template.stream(**data).dump(os.path.join(dir, file)) | |
class DevToProvider(Jinja2Mixin, DataProvider): | |
OVERVIEW_PAGE_URL = 'https://dev.to/api/articles?page={}' | |
ARTICLE_URL = 'https://dev.to/api/articles/{}' | |
USER_URL = 'https://dev.to/{}' | |
TAG_URL = 'https://dev.to/t/{}' | |
JINJA_TEMPLATE = 'templates/dev.to.html.j2' | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.urls = [] | |
def get_name_provider(self): | |
return "dev.to" | |
@staticmethod | |
def get_file_name(article_json): | |
file_name = article_json['title'] + ".html" | |
if article_json['user']['name']: | |
file_name = article_json['user']['name'] + " - " + file_name | |
return utils.safe_file_name(file_name) | |
@staticmethod | |
def convert_data(web_json): | |
article_data = { | |
'cover_image': web_json['cover_image'], | |
'title': web_json['title'], | |
'published_date': web_json['published_at'], | |
'article': web_json['body_html'], | |
'tags': [], | |
'user': { | |
'name': web_json['user']['name'], | |
'url': DevToProvider.USER_URL.format( | |
web_json['user']['username']) | |
} | |
} | |
for tag in web_json['tag_list']: | |
tag_url = DevToProvider.TAG_URL.format(tag) | |
tag_data = dict(label=tag, url=tag_url) | |
article_data['tags'].append(tag_data) | |
return article_data | |
def prepare_overview(self, page): | |
overview = requests.get( | |
DevToProvider.OVERVIEW_PAGE_URL.format(page)).json() | |
amount_added = 0 | |
for article in overview: | |
file_name = DevToProvider.get_file_name(article) | |
if not os.path.isfile(os.path.join('raw/dev.to', file_name)): | |
id = article['id'] | |
url = DevToProvider.ARTICLE_URL.format(id) | |
self.urls.append(url) | |
amount_added += 1 | |
return len(overview), amount_added | |
def prepare_providing(self, full_scrape=False): | |
page = 1 | |
length, added = self.prepare_overview(page) | |
while length != 0 and (full_scrape or added == length): | |
page += 1 | |
length, added = self.prepare_overview(page) | |
def article_data_provider(self): | |
for url in self.urls: | |
article = requests.get(url).json() | |
if not self.tag_filter or (self.tag_filter and | |
self.tag_filter.check_tags(article['tag_list'])): | |
article_data = DevToProvider.convert_data(article) | |
file_name = DevToProvider.get_file_name(article_data) | |
yield file_name, article_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ArticleProcessor(object): | |
def process_article_data(article_data): | |
raise NotImplementedError | |
class PDFArticleProcessor(ArticleProcessor): | |
from jinja2 import Environment, FileSystemLoader | |
import pdfkit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import sleep | |
import os | |
class Scraper(object): | |
def __init__(self, tag_filter=None): | |
self.tag_filter = tag_filter | |
self.data_providers = [] | |
self.dirs = ["raw", "article"] | |
Scraper.create_dirs(self.dirs) | |
@staticmethod | |
def create_dirs(dirs): | |
for dir in dirs: | |
if not os.path.isdir(dir): | |
os.mkdir(dir) | |
def add_data_provider(self, cls): | |
ins = cls(tag_filter=self.tag_filter) | |
dirs = map( | |
lambda d: os.path.join(d, ins.get_name_provider()), self.dirs) | |
Scraper.create_dirs(dirs) | |
self.data_providers.append(ins) | |
def retrieve_data(self, full_scrape=False): | |
for instance in self.data_providers: | |
instance.prepare_providing(full_scrape) | |
base_path = os.path.join("raw", instance.get_name_provider()) | |
instance.write_raw_html(base_path) | |
def timed_loop(self, timeout=3600, n=-1, ): | |
if n < 0: | |
while True: | |
self.retrieve_data() | |
sleep(timeout) | |
elif n > 0: | |
for i in range(n): | |
self.retrieve_data() | |
if i != n - 1: | |
sleep(timeout) | |
# def main_loop(self): | |
# for instance, template in self.data_providers: | |
# instance.prepare_providing() | |
# for article in instance.article_iterator(): | |
# if self.style_file: | |
# article["stylesheet"] = self.style_file | |
# safe_file_name = _safe_file_name(article["file_name"]) | |
# in_file_path = os.path.join( | |
# "raw", safe_file_name + ".html") | |
# template.stream(**article).dump(in_file_path) | |
# out_file_path = os.path.join( | |
# "articles", safe_file_name + ".pdf") | |
# options = {"quiet": ""} | |
# pdfkit.from_file(in_file_path, out_file_path, options) | |
if __name__ == "__main__": | |
from argparse import ArgumentParser | |
from data_provider import DevToProvider | |
scraper = Scraper() | |
scraper.add_data_provider(DevToProvider) | |
scraper.retrieve_data(True) | |
sleep(3600) | |
scraper.timed_loop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TagFilter(object): | |
def __init__(self, required_tags=[], blacklisted_tags=[]): | |
self.required_tags = set(required_tags) | |
self.blacklisted_tags = set(blacklisted_tags) | |
def check_tags(self, tags): | |
return (self.required_tags.issubset(tags) and not | |
bool(len(self.blacklisted_tags.intersection(tags)))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sys import platform | |
from os import sep | |
def safe_file_name(file_name): | |
# Really a windows quirk | |
new_name = file_name | |
if platform == 'win32': | |
trans_dict = {'<': '<', '>': '>', ':': ';', '|': ' pipe ', | |
'/': ' forward slash ', '?': ' question mark ', | |
'*': ' asterisk ', '"': '\''} | |
new_name = '' | |
for char in file_name: | |
if char in trans_dict: | |
new_name += trans_dict[char] | |
else: | |
new_name += char | |
return new_name.replace(sep, '-') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment