|
#!/usr/bin/env python3 |
|
"""Get info given StackExchange question url. |
|
|
|
Usage: se-url2md-title [<so-question-url>...] |
|
|
|
To install: |
|
|
|
$ python3 -mpip install requests werkzeug pyperclip |
|
|
|
As a library: |
|
|
|
>>> import se_url2info |
|
>>> next(se_url2info.get_questions(['http://stackoverflow.com/q/10180765'])).title |
|
'open file with a unicode filename?' |
|
|
|
""" |
|
import html |
|
import logging |
|
import os |
|
import re |
|
import sys |
|
import time |
|
from collections import OrderedDict, defaultdict, namedtuple |
|
from itertools import zip_longest |
|
from types import SimpleNamespace |
|
from urllib.parse import urlsplit |
|
|
|
try: |
|
import pyperclip |
|
import requests |
|
from werkzeug.contrib import cache |
|
except ImportError: # for setup.py |
|
cache = requests = pyperclip = None |
|
|
|
appname = "se-url2md-title" |
|
__version__ = "1.3.0" |
|
api_key = "jnO5sR60LMEHPaFPG9cA0A((" # not a secret |
|
DAY = 86400 |
|
script_dir = os.path.dirname(os.path.realpath(__file__)) |
|
cache_dir = os.path.join(script_dir, '.cachedir') |
|
fscache = cache and cache.FileSystemCache(cache_dir, threshold=DAY) |
|
logger = logging.getLogger(__name__) |
|
|
|
Question = namedtuple('Question', 'id site url') |
|
|
|
|
|
def get_new_data(api_endpoint, host='api.stackexchange.com', key=api_key): |
|
now = int(time.time()) |
|
url = 'https://{host}{api_endpoint}&key={key}'.format(**vars()) |
|
r = requests.get( |
|
url, |
|
headers={ |
|
'Accept': 'application/json', |
|
'User-Agent': '{appname}/{__version__}'.format(**globals()) |
|
}) |
|
logger.info('status: %s, headers: %s', r.status_code, r.headers) |
|
data = SimpleNamespace(**r.json()) |
|
|
|
def ts(unix_time): |
|
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(unix_time)) |
|
|
|
data.updated_timestamp = ts(now) |
|
data.api_url = url |
|
data.backoff = 1 |
|
if hasattr(data, 'items'): |
|
data.items = [ |
|
SimpleNamespace( |
|
**item, |
|
published=data.updated_timestamp, |
|
updated=ts(item['creation_date'])) for item in data.items |
|
] |
|
logger.debug('got %r', data) |
|
""" |
|
https://api.stackexchange.com/docs/error-handling |
|
[ |
|
{ |
|
"error_id": 400, |
|
"description": "An malformed parameter was passed", |
|
"error_name": "bad_parameter" |
|
}, |
|
{ |
|
"error_id": 401, |
|
"description": "No access_token was passed", |
|
"error_name": "access_token_required" |
|
}, |
|
{ |
|
"error_id": 402, |
|
"description": "An access_token that is malformed, expired, or otherwise incorrect was passed", |
|
"error_name": "invalid_access_token" |
|
}, |
|
{ |
|
"error_id": 403, |
|
"description": "The access_token passed does not have sufficient permissions", |
|
"error_name": "access_denied" |
|
}, |
|
{ |
|
"error_id": 404, |
|
"description": "No matching method was found", |
|
"error_name": "no_method" |
|
}, |
|
{ |
|
"error_id": 405, |
|
"description": "No key was passed", |
|
"error_name": "key_required" |
|
}, |
|
{ |
|
"error_id": 406, |
|
"description": "Access token may have been leaked, it will be invalidated", |
|
"error_name": "access_token_compromised" |
|
}, |
|
{ |
|
"error_id": 407, |
|
"description": "A write operation was rejected", |
|
"error_name": "write_failed" |
|
}, |
|
{ |
|
"error_id": 409, |
|
"description": "A request identified by the given request_id has already been run", |
|
"error_name": "duplicate_request" |
|
}, |
|
{ |
|
"error_id": 500, |
|
"description": "An error was encountered while servicing this request, it has been recorded", |
|
"error_name": "internal_error" |
|
}, |
|
{ |
|
"error_id": 502, |
|
"description": "Some violation of the throttling or request quota contract was encountered", |
|
"error_name": "throttle_violation" |
|
}, |
|
{ |
|
"error_id": 503, |
|
"description": "The method, or the entire API, is temporarily unavailable", |
|
"error_name": "temporarily_unavailable" |
|
} |
|
] |
|
""" |
|
r.raise_for_status() |
|
data.quota_remaining = max(data.quota_remaining, 1) |
|
seconds_till_utc_midnight = DAY - now % DAY |
|
logger.info('quota_remaining %d, seconds_till_utc_midnight %d', |
|
data.quota_remaining, seconds_till_utc_midnight) |
|
data.backoff = max(seconds_till_utc_midnight // data.quota_remaining, |
|
data.backoff) |
|
return data |
|
|
|
|
|
def parse_question_url(url, qid_pattern='/q(?:uestions)?/(\d+)'): |
|
purl = urlsplit(url) |
|
m = re.match(qid_pattern, purl.path) |
|
if not m: |
|
raise ValueError( |
|
"expected {qid_pattern}, got {url!r}".format(**vars())) |
|
return Question( |
|
id=int(m.group(1)), url=url, site=hostname2site(purl.hostname)) |
|
|
|
|
|
def hostname2site(hostname, suffixes=['.stackexchange.com', '.com']): |
|
""" |
|
>>> hostname2site('askubuntu.com') |
|
'askubuntu' |
|
>>> hostname2site('math.stackexchange.com') |
|
'math' |
|
""" |
|
for suffix in suffixes: |
|
if hostname.endswith(suffix): |
|
return hostname[:-len(suffix)] |
|
raise ValueError( |
|
'{hostname!r} has no known suffixes ({suffixes!r})'.format(**vars())) |
|
|
|
|
|
def get_questions(urls, *, preserve_order=False, chunksize=30): |
|
if isinstance(urls, str): # allow a single url as an argument |
|
urls = [urls] |
|
# /2.2/questions/{ids}?site={site} |
|
# {ids} can contain up to 100 semicolon delimited ids (30 really) |
|
questions = OrderedDict.fromkeys(parse_question_url(url) for url in urls) |
|
questions_by_site = defaultdict(list) |
|
for q in questions: |
|
questions_by_site[q.site].append(q) |
|
result = [] |
|
for site, same_site_questions in questions_by_site.items(): |
|
for qs in zip_longest(* [iter(same_site_questions)] * chunksize): |
|
ids = ";".join(map(str, sorted(q.id for q in filter(None, qs)))) |
|
logger.debug('len(ids)=%d', ids.count(";") + 1) |
|
data = make_request( |
|
'/2.2/questions/{ids}?site={site}'.format(**vars())) |
|
for q, ssq in zip(data.items, qs): |
|
q.site = site |
|
q.orig_url = ssq.url |
|
if not preserve_order: |
|
yield from data.items |
|
else: |
|
result += data.items |
|
logger.debug('len(data.items)=%d', len(data.items)) |
|
if not preserve_order: |
|
return # we are done |
|
|
|
order = {(q.id, q.site): i for i, q in enumerate(questions)} |
|
result.sort(key=lambda q: order[q.question_id, q.site]) |
|
logger.debug('len(result)=%d', len(result)) |
|
yield from result |
|
|
|
|
|
def make_request(api_endpoint): |
|
data = fscache.get(key=api_endpoint) |
|
if not data: |
|
logger.info('missed cache for {api_endpoint}'.format(**vars())) |
|
data = get_new_data(api_endpoint) |
|
# From https://api.stackexchange.com/docs/throttle |
|
# |
|
# > While not strictly a throttle, the Stack Exchange API |
|
# > employs heavy caching and as such no application should |
|
# > make semantically identical requests more than once a |
|
# > minute |
|
# |
|
# > A dynamic throttle is also in place on a per-method |
|
# > level. If an application receives a response with the |
|
# > backoff field set, it must wait that many seconds before |
|
# > hitting the same method again. For the purposes of |
|
# > throttling, all /me routes are considered to be identical |
|
# > to their /users/{ids} equivalent. |
|
fscache.set(api_endpoint, data, timeout=max(data.backoff, 60)) |
|
return data |
|
|
|
|
|
def main(): |
|
text = None |
|
for q in get_questions(sys.argv[1:] or [pyperclip.paste()]): |
|
title = html.unescape(q.title).replace(']', r'\]') |
|
text = '[{title}]({q.orig_url})'.format(**vars()) |
|
print(text) |
|
if pyperclip and text: |
|
pyperclip.copy(text) # copy the last text to the clipboard |
|
|
|
|
|
if __name__ == '__main__': |
|
sys.exit(main()) |