|
#! /usr/bin/env python3 |
|
|
|
# Standard library |
|
import argparse |
|
import datetime |
|
import hashlib |
|
import os |
|
import re |
|
import pytz |
|
import time |
|
|
|
# 3rd party libraries |
|
import dateparser |
|
import feedparser |
|
|
|
timezone = pytz.timezone("America/New_York") |
|
now = datetime.datetime.now() |
|
now = timezone.localize(now) |
|
current_year = now.year |
|
feed_built = now.strftime("%d, %b, %Y, %H:%M:%S %z") |
|
|
|
class SuperFeed(): |
|
def __init__(self, path=None, content_root_path=None): |
|
self.entries = {} |
|
self.feeds = {} |
|
self.feeds_path = path |
|
self.content_root_path = content_root_path |
|
|
|
def _slugify(self, to_slug): |
|
""" |
|
Return a slug of the specified string |
|
""" |
|
slug = to_slug |
|
try: |
|
slug = re.sub(r"\s+", "-", to_slug).lower() |
|
slug = re.sub(r"\W+", "-", slug) |
|
slug = re.sub(r"--+", "-", slug) |
|
except Exception as err: |
|
print("Could not generate slug for [{}]".format(to_slug), err) |
|
return slug.strip('-') |
|
|
|
def _read_feeds(self): |
|
""" |
|
Read the tab separated value file at the specified path in order to |
|
extract a set of values; |
|
|
|
NAME\tURL |
|
""" |
|
feeds = {} |
|
|
|
if os.path.exists(self.feeds_path): |
|
try: |
|
with open(self.feeds_path, 'r') as fh: |
|
for i, line in enumerate(fh): |
|
if '\t' in line: |
|
cols = line.split('\t') |
|
feeds[cols[0].strip()] = cols[1].strip() |
|
else: |
|
m = re.search(r"(?P<k>.+)\s\s+(?P<v>.+)", line) |
|
if m: |
|
feeds[m.group('k').strip()] = m.group('v').strip() |
|
except Exception as err: |
|
print("Could not read {}. Threw exception:\n{}\n".format(self.feeds_path, err)) |
|
|
|
self.feeds = feeds |
|
|
|
def _convert_timestamp_to_key(self, tstamp): |
|
""" |
|
Takes timestamp as datetime.datetime and converts it to YYYY-MM-DD key |
|
""" |
|
key = None |
|
|
|
try: |
|
key = tstamp.strftime("%Y-%m-%d") |
|
except Exception as err: pass |
|
|
|
return key |
|
|
|
def _convert_feed_entry_to_post(self, entry, source=None): |
|
""" |
|
Takes a feed item from a feedparser feed and creates the text for a Hugo post |
|
""" |
|
post = None |
|
link = None |
|
|
|
if entry: |
|
title = entry['title'].replace('"', '\\"') |
|
slug = self._slugify(title) |
|
|
|
published_date = None |
|
if 'published_parsed' in entry and entry['published_parsed']: |
|
published_date = datetime.datetime.fromtimestamp(time.mktime(entry['published_parsed'])) |
|
elif 'published' in entry and entry['published']: |
|
published_date = dateparser.parse(entry['published']) |
|
if published_date: |
|
published_date = pytz.timezone("America/New_York").localize(published_date) |
|
published_date = published_date.strftime('%Y-%m-%dT%H:%M:%S%z') |
|
|
|
if 'link' in entry and entry['link']: |
|
link = entry['link'] |
|
elif 'guidislink' in entry and entry['guidislink']: |
|
link = entry['guidislink'] |
|
elif 'links' in entry and entry['links'][0]: |
|
link = entry['links'][0] |
|
|
|
hash_of_link = None |
|
try: |
|
hash_of_link = hashlib.sha256(link.encode('utf8')).hexdigest() |
|
except Exception as err: |
|
hash_of_link = hashlib.sha256("{}".format(datetime.datetime.now()).encode('utf8')).hexdigest() |
|
|
|
contents = "" |
|
if 'content' in entry and len(entry['content']) > 0: |
|
contents = entry['content'] |
|
elif 'summary' in entry and len(entry['summary']) > 0: |
|
contents = entry['summary'] |
|
elif 'value' in entry and len(entry['value']) > 0: |
|
contents = entry['value'] |
|
|
|
if type(contents) == type([]) and 'value' in contents: |
|
contents = contents['value'] |
|
|
|
post = """--- |
|
title: "{title}" |
|
slug: "{slug}" |
|
date: {published_date} |
|
link: {link} |
|
hash: "{hash}" |
|
source: "{source}" |
|
draft: false |
|
exclude_from_rss: false |
|
--- |
|
{contents} |
|
""".format(title=title, slug=slug, published_date=published_date, link=link, hash=hash_of_link, source=source, contents=contents) |
|
|
|
return { 'post': post, 'link': link } |
|
|
|
def _parse_feed(self, url, feed_name=None): |
|
""" |
|
Parse the specified feed into a dict with the publication day as a key |
|
""" |
|
entries = {} |
|
feed = None |
|
try: |
|
feed = feedparser.parse(url) |
|
except Exception as err: |
|
print("Could not parse: {}. Threw exception:\n{}".format(url, err)) |
|
|
|
entries_parsed = 0 |
|
if feed: |
|
for e in feed['entries']: |
|
published_at = None |
|
try: |
|
if 'published_parsed' in e and e['published_parsed']: |
|
published_at = datetime.datetime.fromtimestamp(time.mktime(e['published_parsed'])) |
|
elif 'published' in e and e['published']: |
|
published_at = dateparser.parse(e['published']) |
|
except Exception as err: |
|
print("Could not properly parse datetime: {}. Threw exception:\n{}".format(e, err)) |
|
published_at_key = self._convert_timestamp_to_key(published_at) |
|
if published_at_key and published_at_key > "2020-04-01": |
|
if not published_at_key in self.entries: self.entries[published_at_key] = [] |
|
try: |
|
self.entries[published_at_key].append(self._convert_feed_entry_to_post(e, source=feed_name)) |
|
entries_parsed += 1 |
|
except Exception as err: |
|
print("Could not parse entry {}. Threw exception:\n{}".format(e, err)) |
|
|
|
#print("\tParsed {} entries".format(entries_parsed)) |
|
return entries |
|
|
|
def _write_posts(self): |
|
""" |
|
Write all of the new posts to files |
|
""" |
|
new_posts = 0 |
|
for day_key, day_entries in self.entries.items(): |
|
print("Processing {} posts for day {}".format(len(day_entries), day_key)) |
|
# make sure the day key dir exists |
|
day_path = os.path.join(self.content_root_path, day_key) |
|
try: |
|
os.makedirs(day_path) |
|
except Exception as err: pass |
|
|
|
for entry in day_entries: |
|
# post : link |
|
if 'link' in entry and (len(entry['link']) > 0): |
|
post_hash = None |
|
try: |
|
post_hash = hashlib.sha256(entry['link'].encode('utf8')).hexdigest() |
|
except Exception as err: pass |
|
if post_hash: |
|
post_fn = "{}.md".format(post_hash) |
|
post_path = os.path.join(self.content_root_path, day_key, post_fn) |
|
if not os.path.exists(post_path) or True: |
|
try: |
|
with open(post_path, 'w') as fh: |
|
fh.write(entry['post']) |
|
new_posts += 1 |
|
except Exception as err: |
|
print("Could not write post to folder {}. Threw exception:\n{}".format(day_path, err)) |
|
|
|
print("Wrote {} new posts".format(new_posts)) |
|
return new_posts |
|
|
|
def parse_all_feeds(self): |
|
""" |
|
Parse all of the feeds in the SuperFeed |
|
""" |
|
self._read_feeds() |
|
for k,v in self.feeds.items(): |
|
print("Downloading latest feed from {}".format(k)) |
|
entries = self._parse_feed(v, feed_name=k) |
|
|
|
def main(): |
|
args = argparse.ArgumentParser(description="Super Feed - Combine a number of feeds into one super feed") |
|
args.add_argument("--tsv", dest="tsv", required=True, help="Path to a .tsv containing name\\tfeed on each line") |
|
args.add_argument("--output", dest="output", required=True, help="Path to the Hugo content directory to store the feed entries in") |
|
args = args.parse_args() |
|
|
|
new_posts = 0 |
|
sf = SuperFeed(path=args.tsv, content_root_path=args.output) |
|
sf.parse_all_feeds() |
|
new_posts += sf._write_posts() |
|
print("{} new posts".format(new_posts)) |
|
|
|
if __name__ == '__main__': |
|
main() |