Skip to content

Instantly share code, notes, and snippets.

Created November 2, 2022 13:30
Show Gist options
  • Save joshmoore/ea8086cc706572324283cf2c49bf1694 to your computer and use it in GitHub Desktop.
Save joshmoore/ea8086cc706572324283cf2c49bf1694 to your computer and use it in GitHub Desktop.
Simple CLI for querying for certain tags
#!/usr/bin/env python
from argparse import (
from collections import (
from datetime import (
import re
import requests
import sys
from scraper_utils import StdoutOrAppend
PY3_TIMEFORMAT = '%Y-%m-%dT%H:%M:%S.%f%z'
PY2_TIMEFORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
from datetime import timezone
utctzinfo = timezone(timedelta(hours=0))
timeformat = PY3_TIMEFORMAT
except ImportError:
print('WARNING: Python 2 ignoring timezone', file=sys.stderr)
utctzinfo = None
timeformat = PY2_TIMEFORMAT
# This is a list of tags that we expect to be present
OME_TAGS = set(["bio-formats",
"ome", "ome-tiff", "ome-xml",
"omero", "omero-figure", "omero-iviewer", "omero-server",
def discourse(urlpath):
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
"Accept": "application/json"
url = DISCOURSE + urlpath
print('Fetching ' + url, file=sys.stderr)
r = requests.get(url, headers=headers)
return r.json()
parser = ArgumentParser()
parser.add_argument("--stale", default=1, type=int, help="Number of days between now and the last post. Information displayed as part of the html link")
parser.add_argument("--days", "-d", default=4, type=int)
parser.add_argument("--ignore", "-i", default="ignore.txt", type=FileType("r"))
parser.add_argument("--new", "-n", action="store_true", help="Only unreplied posts")
parser.add_argument("--solved", "-S", choices=("true", "false", "ignore"),
parser.add_argument("--output", "-o", default="-", type=StdoutOrAppend())
parser.add_argument("--preview", "-p", choices=("solved", "all"),
parser.add_argument("--style", "-s", choices=("html", "urls", "tsv"), default="html",
help="Format to print results in")
parser.add_argument("--missing", "-m", action="store_true",
help="Look for topics which are missing the tag")
parser.add_argument("--tags", "-t", action="append", default=[], type=str,
help="List of tags to search ('OR')")
parser.add_argument("--skip-group", default="ome", help="Skip topics where the last reply is from a member of this group, set to '' to disable")
ns = parser.parse_args()
tags = set()
if not tags:
# u'tags': [{u'count': 1, u'text': u'introduction', u'id': u'introduction'
data = discourse('/tags')
ts = data.pop('tags')
extras = data.pop('extras')
assert not data
categories = extras.pop('categories')
assert not extras
for t in ts:
text = t['text']
for prefix in ("bioformats", "ome", "idr"):
if text.replace("-", "").startswith(prefix):
assert OME_TAGS.issubset(tags)
print('Discovered tags: {}\n'.format(", ".join(tags)), file=sys.stderr)
old = datetime.utcnow().replace(tzinfo=utctzinfo) - timedelta(days=ns.days)
stale = ns.stale
ometeam = set()
if ns.skip_group:
ometeamr = discourse('/groups/{}/members?limit=50'.format(ns.skip_group))
ometeam = set(u['username'] for u in ometeamr['members'])
topics = []
topics_url = '/latest'
fetch = True
while fetch:
r = discourse(topics_url)
for t in r['topic_list']['topics']:
t['updated'] = max(
datetime.strptime(t['last_posted_at'], timeformat),
datetime.strptime(t['bumped_at'], timeformat))
except ValueError:
print("Using fallback timezone")
t['updated'] = max(
datetime.strptime(t['last_posted_at'], PY2_TIMEFORMAT),
datetime.strptime(t['bumped_at'], PY2_TIMEFORMAT))
delta = ( - t['updated'].date()).days
t['days'] = ""
if delta > stale:
t['days'] = "Last updated: %s days ago" % delta
alltags = [x.replace("omero-", "") for x in t['tags']]
alltags = [x.replace("bio-formats", "bf") for x in alltags]
t['alltags'] = ','.join(alltags)
t['link'] = DISCOURSE + '/t/{id}/{highest_post_number}'.format(**t)
newer = t['updated'] > old
except TypeError:
newer = t['updated'] > old.replace(tzinfo=None)
if newer:
# Pinned posts will be first but may be older than the cutoff
elif not t['pinned']:
fetch = False
topics_url = r['topic_list']['more_topics_url']
print('Fetched {} topics from {} to {}\n'.format(
len(topics), topics[-1]['updated'], topics[0]['updated']), file=sys.stderr)
with ns.ignore as f:
ignore_links = set(
except IOError:
ignore_links = set()
def remove_formatting(txt, truncate=0):
s = re.sub('\<[^>]*\>', ' ', txt)
s = re.sub(r'\s+', ' ', s)
s = s.strip()
if truncate and len(s) > truncate:
s = s[:(truncate - 1)] + '…'
return s
def get_last_post(t):
thread = discourse('/t/{}'.format(t['id']))
lastpost = thread['post_stream']['posts'][-1]
return lastpost
def keep_or_skip_post(t):
Check whether the topic should be shown (reason: None) or skipped (reason: str)
Return: (reason, thread)
reasons = (
(t['closed'], 'closed'),
(t['archived'], 'archived'),
(not (set(t['tags']) & tags) and not ns.missing, 'no-tag'),
(ns.solved == "true" and not t['has_accepted_answer'], 'solved'),
( and t['highest_post_number'] > 1, '???'),
(t['last_poster_username'] in ometeam, 'responded'),
(t['link'] in ignore_links, 'ignored'),
for check, reason in reasons:
if check:
return reason, t
lastpost = None
t['preview'] = ''
if ns.preview == 'all':
lastpost = get_last_post(t)
t['preview'] = "Preview: " + remove_formatting(lastpost['cooked'], 80)
# Accepted answers: check whether there's a post after the accepted one
# since this might be a follow-up problem
if ns.solved == "false" and t['has_accepted_answer']:
if not lastpost:
lastpost = get_last_post(t)
if lastpost['accepted_answer']:
return 'accepted', t
t['preview'] = 'Solved? ' + remove_formatting(lastpost['cooked'], 80)
return None, t
if == "urls":
format = '{link}'
elif == "tsv":
format = "{n}\t{updated}\t{last_poster_username}\t{link}\t{title}"
format = '<li>[???] <a href="{link}">[sc:{alltags}] {title} ({last_poster_username})</a> {preview}&nbsp {days}</li>'
standup_forums = []
count = 0
skipped = defaultdict(int)
for t in topics:
reason, t = keep_or_skip_post(t)
if reason:
skipped[reason] += 1
standup_forums.append(format.format(n=count, **t))
skipped = ["{}: {}".format(k, v) for k, v in skipped.items()]
skipped = ", ".join(skipped)
print('Skipped: {}\n'.format(skipped), file=sys.stderr)
with ns.output as f:
if standup_forums:
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment