Skip to content

Instantly share code, notes, and snippets.

@Guymer Guymer/rss_checker.py
Last active Aug 4, 2019

Embed
What would you like to do?
a Python script to email me about about new RSS posts
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# Import modules ...
import HTMLParser
import io
import json
import os
import pyguymer
import requests
import subprocess
import time
import xml
import xml.etree
import xml.etree.ElementTree
# Define settings ...
path = u"/path/to/rss_checker.json"
email = u"you@example.com"
nlim = 30 # [#]
wait = 15.0 # [s]
# Define function ...
def construct_email(parser, feed, title, date, link, content):
# Use feed title for the email subject ...
ans = u"Subject: New post in \"{0:s}\" feed\n".format(feed.text.strip())
# Start the email content ...
ans += u"\n"
# Add the article title ...
ans += u"Post Title: {0:s}\n".format(title.text.strip())
# Add the article date ...
ans += u"Post Date: {0:s}\n".format(date.text.strip())
# Add the article link ...
ans += u"Post Link: {0:s}\n".format(link)
# Add the article description (if present) ...
if content is not None:
ans += u"Post Description:\n{0:s}\n".format(parser.unescape(content.text.strip()))
# Return the answer ...
return ans.encode(u"utf-8")
# Load data file as JSON ...
data = json.load(io.open(path, "rt", encoding = u"utf-8"))
# Start parser ...
pars = HTMLParser.HTMLParser()
# Start session ...
sess = requests.Session()
sess.allow_redirects = True
sess.headers.update({"Accept" : "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"})
sess.headers.update({"Accept-Language" : "en-GB,en;q=0.5"})
sess.headers.update({"Accept-Encoding" : "gzip, deflate"})
sess.headers.update({"DNT" : "1"})
sess.headers.update({"Upgrade-Insecure-Requests" : "1"})
sess.headers.update({"User-Agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:68.0) Gecko/20100101 Firefox/68.0"})
sess.max_redirects = 5
# Initialize counter ...
n = 0 # [#]
# Loop over feeds ...
for feed, emails in data[u"feeds"].iteritems():
print u"Processing \"{0:s}\" ...".format(feed)
# Download feed ...
src = pyguymer.download_text(sess, feed)
# Load feed as XML ...
root = xml.etree.ElementTree.fromstring(src)
# Determine the feed format ...
if root.tag == u"{http://www.w3.org/2005/Atom}feed":
print u" It is an Atom feed"
# Loop over all entry tags in the feed ...
for entry in root.findall(u"{http://www.w3.org/2005/Atom}entry"):
# Find the link to the article ...
link = entry.find(u"{http://www.w3.org/2005/Atom}id").text.strip()
if not link.startswith(u"http"):
link = entry.find(u"{http://www.w3.org/2005/Atom}link").get(u"href").strip()
if not link.startswith(u"http"):
raise Exception(u"cannot find a link that starts with http")
# Skip this article if it has already been emailed ...
if link in emails:
continue
# Construct email ...
inp = construct_email(
parser = pars,
feed = root.find(u"{http://www.w3.org/2005/Atom}title"),
title = entry.find(u"{http://www.w3.org/2005/Atom}title"),
date = entry.find(u"{http://www.w3.org/2005/Atom}updated"),
link = link,
content = entry.find(u"{http://www.w3.org/2005/Atom}content")
)
# Send email and increment counter ...
proc = subprocess.Popen([u"ssmtp", email], stdin = subprocess.PIPE)
proc.communicate(inp)
if proc.returncode != 0:
raise Exception(u"\"ssmtp\" command failed")
n += 1 # [#]
print u" Sent email about {0:s}".format(link)
# Save article so that it is not sent again ...
data[u"feeds"][feed].append(link)
io.open(path, "wt", encoding = u"utf-8").write(
json.dumps(
data,
ensure_ascii = False,
indent = 4,
encoding = u"utf-8",
sort_keys = True
)
)
# Stop sending emails or wait so that this script does not spam the server ...
if n >= nlim:
print u"Finishing cleanly; sent too many emails."
exit()
time.sleep(wait)
elif root.tag == u"rss":
print u" It is an RSS feed"
# Loop over all item tags in the first channel tag of the feed ...
for item in root.find(u"channel").findall(u"item"):
# Find the link to the article ...
link = item.find(u"link").text.strip()
if not link.startswith(u"http"):
raise Exception(u"cannot find a link that starts with http")
# Skip this article if it has already been emailed ...
if link in emails:
continue
# Construct email ...
inp = construct_email(
parser = pars,
feed = root.find(u"channel").find(u"title"),
title = item.find(u"title"),
date = item.find(u"pubDate"),
link = link,
content = item.find(u"description")
)
# Send email and increment counter ...
proc = subprocess.Popen([u"ssmtp", email], stdin = subprocess.PIPE)
proc.communicate(inp)
if proc.returncode != 0:
raise Exception(u"\"ssmtp\" command failed")
n += 1 # [#]
print u" Sent email about {0:s}".format(link)
# Save article so that it is not sent again ...
data[u"feeds"][feed].append(link)
io.open(path, "wt", encoding = u"utf-8").write(
json.dumps(
data,
ensure_ascii = False,
indent = 4,
encoding = u"utf-8",
sort_keys = True
)
)
# Stop sending emails or wait so that this script does not spam the server ...
if n >= nlim:
print u"Finishing cleanly; sent too many emails."
exit()
time.sleep(wait)
else:
raise Exception(u"\"{0:s}\" is an unrecognized feed format".format(root.tag))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.