Skip to content

Instantly share code, notes, and snippets.

@blackle
Created June 26, 2024 07:08
Show Gist options
  • Save blackle/357be123c33c998e4e2b3de6b8f1ad0b to your computer and use it in GitHub Desktop.
Save blackle/357be123c33c998e4e2b3de6b8f1ad0b to your computer and use it in GitHub Desktop.
wikipedia perennial sources changes RSS feed generator
#!/usr/bin/env python3
import json
import requests
import wikitextparser as wtp
from difflib import SequenceMatcher
from datetime import datetime, timezone
import dateutil.relativedelta
import schedule
import http.server
import socketserver
import threading
import time
from feedgen.feed import FeedGenerator
from urllib.parse import urlparse
STATUSES = {
"gr": ("https://upload.wikimedia.org/wikipedia/commons/thumb/5/50/Yes_Check_Circle.svg/20px-Yes_Check_Circle.svg.png", "Generally reliable"),
"nc": ("https://upload.wikimedia.org/wikipedia/commons/thumb/1/12/Achtung-orange.svg/20px-Achtung-orange.svg.png", "No consensus, unclear, or additional considerations apply"),
"gu": ("https://upload.wikimedia.org/wikipedia/commons/thumb/4/41/Argentina_-_NO_symbol.svg/20px-Argentina_-_NO_symbol.svg.png", "Generally unreliable"),
"d": ("https://upload.wikimedia.org/wikipedia/en/thumb/8/8b/Stop_hand.svg/20px-Stop_hand.svg.png", "Deprecated"),
"b=y": ("https://upload.wikimedia.org/wikipedia/commons/thumb/7/7c/X-circle.svg/20px-X-circle.svg.png", "Blacklisted"),
}
def editdistance(a, b):
s = SequenceMatcher(None, a, b)
return 1-s.ratio()
def distance_below_threshold(a, b):
return editdistance(str(a), str(b)) < .25 #arbitrary choice
def get_table_for_date(ymd):
WIKI_URL = f"https://en.wikipedia.org/w/api.php?action=query&prop=revisions&titles=Wikipedia:Reliable_sources/Perennial_sources&rvslots=*&rvprop=timestamp|ids|content&format=json&formatversion=2&rvstart={ymd}T00:00:00.000Z&rvlimit=1"
res = requests.get(WIKI_URL)
data = res.json()
rev = data['query']['pages'][0]['revisions'][0]['slots']['main']
content = rev['content']
parsed = wtp.parse(content)
sources = [s for s in parsed.sections if (s.title and "Sources" in s.title)][0]
table = sources.tables[0].data()
rows = []
for row in table:
if row[0] == "Source":
continue
title = wtp.parse(row[0]).plain_text().strip()
status = wtp.parse(row[1]).templates[0].arguments
status = ",".join([arg.value for arg in status])
desc = wtp.parse(row[4]).plain_text().strip()
rows.append((title, status, desc))
return set(rows)
def rich_diff_for_span(start_date, end_date):
start = get_table_for_date(start_date)
end = get_table_for_date(end_date)
start_d = start - end
end_d = end - start
pairs = []
found = set()
for s in start_d:
for e in end_d:
if (e[0] == s[0] or e[2] == s[2]) and e not in found:
pairs.append([s, e])
found.add(s)
found.add(e)
break
start_d = start_d - found
end_d = end_d - found
found = set()
for s in start_d:
for e in end_d:
if distance_below_threshold(e, s) and e not in found:
pairs.append([s, e])
found.add(s)
found.add(e)
start_d = start_d - found
end_d = end_d - found
pairs = sorted(pairs, key=lambda x: x[0])
removed = sorted(list(start_d))
added = sorted(list(end_d))
def body_to_html(body):
return body.replace("\n\n", "</p><p>").replace("\n", "</p><p>")
def get_title_diff(a, b):
title = b[0]
if b[0] != a[0]:
title = f"<s>{a[0]}</s> {b[0]}"
return title
def format_status(a):
statuses = a[1].split(",")
statuses = [s for s in statuses if s in STATUSES]
imgs = " ".join([f"<img src='{STATUSES[s][0]}'/>" for s in statuses])
msgs = ", ".join([STATUSES[s][1] for s in statuses])
return f"<p>{imgs} {msgs}</p>"
def get_description_diff(a, b):
sq = SequenceMatcher(None, a[2], b[2])
out = ""
for tag, i1, i2, j1, j2 in sq.get_opcodes():
a_ = a[2][i1:i2]
b_ = b[2][j1:j2]
if tag == 'replace':
out += f'<s>{a_}</s><b>{b_}</b>'
if tag == 'delete':
out += f'<s>{a_}</s>'
if tag == 'insert':
out += f'<b>{b_}</b>'
if tag == 'equal':
out += f'<span style="opacity:0.7">{b_}</span>'
return f"<p>{out}</p>"
def get_status_diff(a, b):
if a[1] == b[1]:
return format_status(b)
return f"<s>{format_status(a)}</s>{format_status(b)}"
def format_pair(a, b):
title = get_title_diff(a, b)
return f"<h3>{title}</h3>\n{get_status_diff(a,b)}\n{get_description_diff(a, b)}\n<hr/>\n"
def format_entry(a):
return f"<h3>{a[0]}</h3>\n{format_status(a)}\n<p>{body_to_html(a[2])}</p>\n<hr/>\n"
title = f"Changes from {start_date} to {end_date}"
out = ""
if pairs:
out += "<h2>Changed</h2>\n"
for a, b in pairs:
out += format_pair(a, b)
if removed:
out += "<h2>Removed</h2>\n"
for a in removed:
out += f"<s>{format_entry(a)}</s>"
if added:
out += "<h2>Added</h2>\n"
for a in added:
out += format_entry(a)
return (title, out)
report = (datetime.today().replace(day=1), "", "")
def update_report():
global report
end_date = datetime.today().replace(day=1)
start_date = end_date - dateutil.relativedelta.relativedelta(months=1)
report_title, report_body = rich_diff_for_span(start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))
date = start_date.replace(tzinfo=timezone.utc)
report = (date, report_title, report_body)
def genfeed():
global report
date, report_title, report_body = report
fg = FeedGenerator()
fg.id(f'https://lethargic.talkative.fish:9187/')
fg.title(f'Wikipedia perennial sources updates')
fg.author( {'name':'Wikipedia Editors'} )
fg.language('en')
fe = fg.add_entry()
fe.id(date.strftime('%Y-%m-%d'))
fe.title(report_title)
fe.content(report_body,type="xhtml")
fe.published(date)
return fg
class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
parsedpath = urlparse(self.path)
if parsedpath.path == '/':
fg = genfeed()
self.send_response(200)
self.send_header("Content-type", "application/atom+xml; charset=utf-8")
self.end_headers()
fg.atom_file(self.wfile,pretty=True)
return
return None
class ThreadingSimpleServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
pass
if __name__ == "__main__":
update_report()
PORT = 9187
print("port:",PORT)
socketserver.TCPServer.allow_reuse_address = True
server = ThreadingSimpleServer(("", PORT), MyHttpRequestHandler)
schedule.every().day.do(update_report)
def schedule_thread():
while True:
schedule.run_pending()
time.sleep(1)
thread = threading.Thread(target=schedule_thread)
thread.start()
server.serve_forever()
print(report)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment