Skip to content

Instantly share code, notes, and snippets.

@marknca
Last active August 26, 2022 18:21
Show Gist options
  • Save marknca/c863d166cf91d710c247f6af563ca73b to your computer and use it in GitHub Desktop.
Save marknca/c863d166cf91d710c247f6af563ca73b to your computer and use it in GitHub Desktop.
Read multiple RSS feeds and write their entries as Hugo posts

Usage

List your feeds in a tab seperated value text file with the format;

Name of feed\tFEED_URL
Name of feed\tFEED_URL
Name of feed\tFEED_URL

The script has two mandatory arguments, one for the feed file and the other as an output directory within your Hugo content structure.

python3 super-feed.py --tsv FEED_LIST.tsv --output ~/website/content/feeds
dateparser>=0.7.4
feedparser>=5.2.1
#! /usr/bin/env python3
# Standard library
import argparse
import datetime
import hashlib
import os
import re
import pytz
import time
# 3rd party libraries
import dateparser
import feedparser
timezone = pytz.timezone("America/New_York")
now = datetime.datetime.now()
now = timezone.localize(now)
current_year = now.year
feed_built = now.strftime("%d, %b, %Y, %H:%M:%S %z")
class SuperFeed():
def __init__(self, path=None, content_root_path=None):
self.entries = {}
self.feeds = {}
self.feeds_path = path
self.content_root_path = content_root_path
def _slugify(self, to_slug):
"""
Return a slug of the specified string
"""
slug = to_slug
try:
slug = re.sub(r"\s+", "-", to_slug).lower()
slug = re.sub(r"\W+", "-", slug)
slug = re.sub(r"--+", "-", slug)
except Exception as err:
print("Could not generate slug for [{}]".format(to_slug), err)
return slug.strip('-')
def _read_feeds(self):
"""
Read the tab separated value file at the specified path in order to
extract a set of values;
NAME\tURL
"""
feeds = {}
if os.path.exists(self.feeds_path):
try:
with open(self.feeds_path, 'r') as fh:
for i, line in enumerate(fh):
if '\t' in line:
cols = line.split('\t')
feeds[cols[0].strip()] = cols[1].strip()
else:
m = re.search(r"(?P<k>.+)\s\s+(?P<v>.+)", line)
if m:
feeds[m.group('k').strip()] = m.group('v').strip()
except Exception as err:
print("Could not read {}. Threw exception:\n{}\n".format(self.feeds_path, err))
self.feeds = feeds
def _convert_timestamp_to_key(self, tstamp):
"""
Takes timestamp as datetime.datetime and converts it to YYYY-MM-DD key
"""
key = None
try:
key = tstamp.strftime("%Y-%m-%d")
except Exception as err: pass
return key
def _convert_feed_entry_to_post(self, entry, source=None):
"""
Takes a feed item from a feedparser feed and creates the text for a Hugo post
"""
post = None
link = None
if entry:
title = entry['title'].replace('"', '\\"')
slug = self._slugify(title)
published_date = None
if 'published_parsed' in entry and entry['published_parsed']:
published_date = datetime.datetime.fromtimestamp(time.mktime(entry['published_parsed']))
elif 'published' in entry and entry['published']:
published_date = dateparser.parse(entry['published'])
if published_date:
published_date = pytz.timezone("America/New_York").localize(published_date)
published_date = published_date.strftime('%Y-%m-%dT%H:%M:%S%z')
if 'link' in entry and entry['link']:
link = entry['link']
elif 'guidislink' in entry and entry['guidislink']:
link = entry['guidislink']
elif 'links' in entry and entry['links'][0]:
link = entry['links'][0]
hash_of_link = None
try:
hash_of_link = hashlib.sha256(link.encode('utf8')).hexdigest()
except Exception as err:
hash_of_link = hashlib.sha256("{}".format(datetime.datetime.now()).encode('utf8')).hexdigest()
contents = ""
if 'content' in entry and len(entry['content']) > 0:
contents = entry['content']
elif 'summary' in entry and len(entry['summary']) > 0:
contents = entry['summary']
elif 'value' in entry and len(entry['value']) > 0:
contents = entry['value']
if type(contents) == type([]) and 'value' in contents:
contents = contents['value']
post = """---
title: "{title}"
slug: "{slug}"
date: {published_date}
link: {link}
hash: "{hash}"
source: "{source}"
draft: false
exclude_from_rss: false
---
{contents}
""".format(title=title, slug=slug, published_date=published_date, link=link, hash=hash_of_link, source=source, contents=contents)
return { 'post': post, 'link': link }
def _parse_feed(self, url, feed_name=None):
"""
Parse the specified feed into a dict with the publication day as a key
"""
entries = {}
feed = None
try:
feed = feedparser.parse(url)
except Exception as err:
print("Could not parse: {}. Threw exception:\n{}".format(url, err))
entries_parsed = 0
if feed:
for e in feed['entries']:
published_at = None
try:
if 'published_parsed' in e and e['published_parsed']:
published_at = datetime.datetime.fromtimestamp(time.mktime(e['published_parsed']))
elif 'published' in e and e['published']:
published_at = dateparser.parse(e['published'])
except Exception as err:
print("Could not properly parse datetime: {}. Threw exception:\n{}".format(e, err))
published_at_key = self._convert_timestamp_to_key(published_at)
if published_at_key and published_at_key > "2020-04-01":
if not published_at_key in self.entries: self.entries[published_at_key] = []
try:
self.entries[published_at_key].append(self._convert_feed_entry_to_post(e, source=feed_name))
entries_parsed += 1
except Exception as err:
print("Could not parse entry {}. Threw exception:\n{}".format(e, err))
#print("\tParsed {} entries".format(entries_parsed))
return entries
def _write_posts(self):
"""
Write all of the new posts to files
"""
new_posts = 0
for day_key, day_entries in self.entries.items():
print("Processing {} posts for day {}".format(len(day_entries), day_key))
# make sure the day key dir exists
day_path = os.path.join(self.content_root_path, day_key)
try:
os.makedirs(day_path)
except Exception as err: pass
for entry in day_entries:
# post : link
if 'link' in entry and (len(entry['link']) > 0):
post_hash = None
try:
post_hash = hashlib.sha256(entry['link'].encode('utf8')).hexdigest()
except Exception as err: pass
if post_hash:
post_fn = "{}.md".format(post_hash)
post_path = os.path.join(self.content_root_path, day_key, post_fn)
if not os.path.exists(post_path) or True:
try:
with open(post_path, 'w') as fh:
fh.write(entry['post'])
new_posts += 1
except Exception as err:
print("Could not write post to folder {}. Threw exception:\n{}".format(day_path, err))
print("Wrote {} new posts".format(new_posts))
return new_posts
def parse_all_feeds(self):
"""
Parse all of the feeds in the SuperFeed
"""
self._read_feeds()
for k,v in self.feeds.items():
print("Downloading latest feed from {}".format(k))
entries = self._parse_feed(v, feed_name=k)
def main():
args = argparse.ArgumentParser(description="Super Feed - Combine a number of feeds into one super feed")
args.add_argument("--tsv", dest="tsv", required=True, help="Path to a .tsv containing name\\tfeed on each line")
args.add_argument("--output", dest="output", required=True, help="Path to the Hugo content directory to store the feed entries in")
args = args.parse_args()
new_posts = 0
sf = SuperFeed(path=args.tsv, content_root_path=args.output)
sf.parse_all_feeds()
new_posts += sf._write_posts()
print("{} new posts".format(new_posts))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment