Skip to content

Instantly share code, notes, and snippets.

@mnot
Created December 27, 2009 10:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mnot/264237 to your computer and use it in GitHub Desktop.
Save mnot/264237 to your computer and use it in GitHub Desktop.
feed_history - demonstration implementation of incremental feed
#!/usr/bin/env python
"""
feed_history - demonstration implementation of incremental feeds.
See:
http://www.mnot.net/drafts/draft-nottingham-atompub-feed-history-07.txt
This is a SAMPLE ONLY. In particular, error handling is not robust, entry
combination is basic, and using pickle for persistence may cause problems
(e.g., unnecessary fetches when the class is changed).
A more robust implementation might store the identity of each archive it
has seen separately, so that mistakes in the published feed don't cause
it to walk all the way backwards. It might also store the identity of the
archive that an entry is found in.
"""
__license__ = """
Copyright (c) 2005-2006 Mark Nottingham <mnot@pobox.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__version__ = "0.4"
import xml.dom.minidom, urllib2, sys
FH = "http://purl.org/syndication/history/1.0"
RSS1 = "http://purl.org/rss/1.0/"
ATOM = "http://www.w3.org/2005/Atom"
RDF = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
class ArchivedFeedAggregator:
"""
A syndication (RSS 1.0, 2.0 or Atom) feed that puts together
Archived feeds.
@ivar subscription_uri: the feed's location
@type subscription_uri: URI string
@ivar last_archive_uri: location of the last archive feed seen
@type last_archive_uri: URI string
@ivar _head: the feed's head section
@type _head: dom element node
@ivar _entries: the feed's entries
@type _entries: list of dom element nodes
"""
def __init__(self, uri):
self.subscription_uri = uri
self.last_archive_uri = None
self._head = None
self._entries = []
def update(self):
"""
Update the feed.
"""
head, entries = self._fetch(self.subscription_uri)
complete = self._getComplete(head)
prev_uri = self._getPrevUri(head)
if prev_uri:
if prev_uri and prev_uri != self.last_archive_uri:
prev_entries = self._walk(prev_uri)
entries = self._combine(prev_entries, entries)
self._entries = self._combine(self._entries, entries)
else:
self._entries = entries
self._head = head
self.last_archive_uri = prev_uri
def __str__(self):
"""
Print the titles of all entries in the feed.
"""
o = []
for entry in self._entries:
try:
o.append(entry.getElementsByTagName("title")[0].childNodes[0].data)
except:
o.append("-")
return "\n".join(o)
def _walk(self, uri):
"""
Walk back a URI's prev links to aggregate the entries.
"""
try:
head, entries = self._fetch(uri)
except urllib2.HTTPError:
sys.stderr.write("*** Problem fetching %s; incomplete feed\n" % uri)
return []
prev_uri = self._getPrevUri(head)
if prev_uri and prev_uri != self.last_archive_uri:
prev_entries = self._walk(prev_uri)
return self._combine(prev_entries, entries)
else:
return entries
def _combine(self, old_entries, new_entries):
"""
Given two lists of entries, combine them.
"""
new_ids = [self._getID(entry) for entry in new_entries]
tmp_entries = [entry for entry in old_entries[:len(new_entries)] if self._getID(entry) not in new_ids]
return new_entries + tmp_entries + old_entries[len(new_entries):]
def _getID(self, entry):
"""
Get an entry's unique identifier.
"""
if entry.localName == 'item':
if entry.namespaceURI == RSS1:
return entry.getAttributeNS(RDF, 'about').strip()
else:
try:
return entry.getElementsByTagName("guid")[0].childNodes[0].data.strip()
except IndexError:
return entry.getElementsByTagName("link")[0].childNodes[0].data.strip()
elif entry.localName == 'entry' and entry.namespaceURI == ATOM:
try:
return entry.getElementsByTagNameNS(ATOM, "id")[0].childNodes[0].data.strip()
except IndexError:
try:
return entry.getElementsByTagNameNS(ATOM, "link")[0].getAttribute("href").strip()
except:
pass
return entry.toxml()
def _getComplete(self, head):
"""
Get a feed document head's fh:complete value, or None.
"""
try:
nonce = [i for i in head if (i.namespaceURI == FH and i.localName =="complete")][0]
return True
except IndexError:
return False
def _getPrevUri(self, head):
"""
Get a feed document head's fh:prev-archive value, or None.
"""
try: # TODO: absolutise relative URI
return [i for i in head if (
i.namespaceURI == ATOM and \
i.localName == "link" and \
i.getAttribute("rel") == "prev-archive" \
)][0].getAttribute("href")
except IndexError:
sys.stderr.write("* No previous link found.\n")
return None
def _fetch(self, uri):
"""
Fetch the head and a list of entries for a feed URI.
"""
sys.stderr.write("* fetching %s\n" % uri)
dom = xml.dom.minidom.parse(urllib2.urlopen(uri))
if dom.documentElement.getAttribute("version") in ["2.0", "0.94", "0.93", "0.92", "0.91"]:
type = "2.0"
channel = dom.documentElement.getElementsByTagName("channel")[0]
head = [i for i in channel.childNodes if i.localName != "item"]
entries = [i for i in channel.childNodes if i.localName == "item"]
elif dom.documentElement.namespaceURI == RDF and dom.documentElement.localName == u'RDF':
type = "1.0"
head = dom.documentElement.getElementsByTagNameNS(RSS1, "channel")[0].childNodes
entries = dom.documentElement.getElementsByTagNameNS(RSS1, "item")
elif dom.documentElement.namespaceURI == ATOM and dom.documentElement.localName == u'feed':
type = "atom"
head = [i for i in dom.documentElement.childNodes if not (i.namespaceURI != ATOM and i.localName == "entry")]
entries = [i for i in dom.documentElement.childNodes if (i.namespaceURI == ATOM and i.localName == "entry")]
else:
raise NotImplementedError, "Feed Format Not Recognized"
sys.stderr.write(" %s entries\n" % len(entries))
return head, entries
if __name__ == "__main__":
import sys
import cPickle as pickle
try:
f = pickle.load(open(sys.argv[1]))
except IndexError:
sys.stderr.write("USAGE: %s file [uri] \n" % sys.argv[0])
sys.exit(1)
except IOError:
try:
f = ArchivedFeedAggregator(sys.argv[2])
except IndexError:
sys.stderr.write("URI must be supplied for a new file.\n")
sys.exit(1)
f.update()
pickle.dump(f, open(sys.argv[1], 'w'), -1)
print f
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment