Skip to content

Instantly share code, notes, and snippets.

@stuartlangridge
Created November 29, 2014 17:24
Show Gist options
  • Save stuartlangridge/ef08d5e1737181e2bee7 to your computer and use it in GitHub Desktop.
Save stuartlangridge/ef08d5e1737181e2bee7 to your computer and use it in GitHub Desktop.
Pelican plugin for webmentions. This will not work out of the box for you; it's too kryogenix.org-specific. But it may help.
from pelican import signals
import json, urllib, urlparse, datetime, html5lib, os, codecs
LIVESITEURL = ""
WM_CACHE_PATH = None
WM_CACHE = {
"domains": {},
"pinged": {}
}
def startup(p):
global LIVESITEURL, WM_CACHE, WM_CACHE_PATH
LIVESITEURL = p.settings.get("LIVESITEURL")
if not LIVESITEURL:
print "Webmentions error: no LIVESITEURL defined in settings"
WM_CACHE_PATH = os.path.join(p.settings["PLUGIN_PATH"], "..", "webmentions.json")
if os.path.exists(WM_CACHE_PATH):
fp = codecs.open(WM_CACHE_PATH, encoding="utf8", mode="r")
WM_CACHE = json.load(fp)
fp.close()
def shutdown(p):
global WM_CACHE, WM_CACHE_PATH
fp = codecs.open(WM_CACHE_PATH, encoding="utf8", mode="w")
fp.write(json.dumps(WM_CACHE))
fp.close()
def fetch_webmentions(generator):
global LIVESITEURL
if not LIVESITEURL: return
site = urlparse.urlparse(LIVESITEURL).netloc
print "Fetching webmentions for", site
try:
fp = urllib.urlopen(
"https://webmention.herokuapp.com/api/mentions?%s" %
urllib.urlencode({"site": site}))
j = json.load(fp)
except:
raise
by_target = {}
for x in j:
wm = {
"name": x.get("name", ""),
"published": datetime.datetime.utcfromtimestamp(x.get("published", 954790000000) / 1000),
"summary": x.get("summary", ""),
"author_name": x.get("author", {"name": "An unnamed person"}).get("name", "An unnamed person"),
"author_photo": x.get("author", {"photo": "http://www.gravatar.com/avatar/no?d=mm"}).get("photo", "http://www.gravatar.com/avatar/no?d=mm"),
"author_url": x.get("author", {"url": ""}).get("url", ""),
"url": x.get("url", ""),
}
if wm["author_photo"] is None:
wm["author_photo"] = "http://www.gravatar.com/avatar/no?d=mm"
if wm["url"]:
wm["parsed_url"] = urlparse.urlparse(wm["url"])
else:
wm["parsed_url"] = None
for t in x["targets"]:
if t not in by_target:
by_target[t] = []
by_target[t].append(wm)
for article in list(generator.articles):
article.webmentions = []
for target, wms in by_target.items():
if target.endswith(article.url):
article.webmentions = wms
if article.webmentions:
print "Found %s webmentions for %s" % (len(article.webmentions), article.url)
def send_webmentions(generator):
global LIVESITEURL
if not LIVESITEURL: return
counts = {"pinged in past": 0, "no webmentions endpoint": 0, "webmentions endpoint error": 0, "pinged": 0}
for article in list(generator.articles):
myurl = "/".join([LIVESITEURL, article.url])
parser = html5lib.HTMLParser(tree=html5lib.getTreeBuilder("dom"))
dom = parser.parse(article._content)
for a in dom.getElementsByTagName("a"):
href = a.getAttribute("href")
if not href.startswith("http:") and not href.startswith("https:"): continue
#print "WM:", article.url[:17]+"...", "->", href[:25]+ "...",
cache_key = myurl + " + " + href
if cache_key in WM_CACHE["pinged"]:
#print "done in past."
counts["pinged in past"] += 1
continue
domain = urlparse.urlparse(href).netloc
# get webmentions endpoint, if there is one
endpoint = WM_CACHE["domains"].get(domain, None)
if endpoint is None:
# fetch one and cache it
try:
fp = urllib.urlopen(href)
link = None
linkheader = fp.info().getheader("Link")
if linkheader:
parts = linkheader.split(";")
if len(parts) == 2 and parts[1].strip() == 'rel="webmention"':
link = parts[0].strip().replace("<", "").replace(">", "")
if not link:
parser2 = html5lib.HTMLParser(tree=html5lib.getTreeBuilder("dom"))
dom2 = parser.parse(fp.read())
for linkel in dom2.getElementsByTagName("link"):
if linkel.getAttribute("rel") == "webmention" or \
linkel.getAttribute("rel") == "http://webmention.org/ webmention" or \
linkel.getAttribute("rel") == "webmention http://webmention.org/":
link = linkel.getAttribute("href")
if not link:
for ael in dom2.getElementsByTagName("a"):
if ael.getAttribute("rel") == "webmention" or \
ael.getAttribute("rel") == "http://webmention.org/ webmention" or \
ael.getAttribute("rel") == "webmention http://webmention.org/":
link = ael.getAttribute("href")
if link:
endpoint = link
else:
endpoint = False
#print "no endpoint."
except:
#print "endpoint error."
endpoint = False
WM_CACHE["domains"][domain] = endpoint
if endpoint == False:
# we have checked already and there is definitely no endpoint
#print "no endpoint (cached)."
counts["no webmentions endpoint"] += 1
continue
# now actually ping the endpoint
try:
fp2 = urllib.urlopen(endpoint, data=urllib.urlencode({"source": myurl, "target": href}))
if fp2.getcode() == 200 or fp2.getcode() == 202:
#print "ok."
counts["pinged"] += 1
else:
#print "failed (code %s)" % fp2.getcode()
counts["webmentions endpoint error"] += 1
except:
#print "bad endpoint (%s)" % (endpoint,)
counts["webmentions endpoint error"] += 1
WM_CACHE["domains"][domain] = False
WM_CACHE["pinged"][cache_key] = datetime.datetime.now().isoformat()
print "Sent webmentions: ", ", ".join(["%s: %s" % x for x in counts.items()])
def register():
signals.article_generator_finalized.connect(fetch_webmentions)
signals.article_generator_finalized.connect(send_webmentions)
signals.initialized.connect(startup)
signals.finalized.connect(shutdown)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment