Last active
August 15, 2021 23:51
-
-
Save danmackinlay/2d245f2905feb0550a25e4115c1e9600 to your computer and use it in GitHub Desktop.
script to decorate .Rmd files with bibliography files from my local zotero installation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
""" | |
Export Zotero biblographies for my blog using betterbibtex export support | |
http://retorque.re/zotero-better-bibtex/exporting/pull | |
Usage: | |
From the root of a blogdown blog, run | |
``` | |
python zot_bib.py path/to/file.Rmd | |
``` | |
It will annotate the file with the following YAML header tags. | |
`bibupdated`, `nocite`, `bibliography`, `reference-section-title`. | |
If you wish to use a custom zotero collection name, set `zotcollpath` in the YAML metadata. | |
This exports both YAML and BibTeX because BibTeX (in fact BibLaTeX) is widely supported but YAML renders more consistently in RMarkdown. | |
This looks like it is asynchronous but inspecting the network traffic I see that it is not. | |
If it were any slower I would fix that, but possibly Zotero would be a bottleneck anyway. | |
""" | |
from pathlib import Path | |
import asyncio | |
# import aiofiles | |
import httpx | |
import sys | |
import re | |
from ruamel.yaml import YAML | |
from ruamel.yaml.error import YAMLError, YAMLStreamError, YAMLWarning, YAMLFutureWarning | |
import datetime | |
from random import shuffle | |
from socket import timeout | |
import itertools | |
BASE_FALLBACK = 0.01 | |
FALLBACK = BASE_FALLBACK | |
MAX_ATTEMPTS = 10 | |
yaml = YAML(typ='rt') | |
REFSTRING = """ | |
## References | |
::: {#refs} | |
::: | |
""" | |
def read(fname): | |
metadata = {} | |
outlines = [] | |
with open(fname, 'r', encoding='utf8') as fp: | |
lines = fp.readlines() | |
if len(lines) == 0: | |
return {}, "" | |
if lines[0] == ('---\n'): # YAML header | |
# Load the data we need to parse | |
to_parse = [] | |
for i, line in enumerate(lines[1:]): | |
# When we find a terminator (`---` or `...`), stop. | |
if line in ('---\n', '...\n'): | |
# Do not include the terminator itself. | |
break | |
# Otherwise, just keep adding the lines to the parseable. | |
to_parse.append(line) | |
try: | |
parsed = yaml.load("".join(to_parse)) | |
except (YAMLWarning, YAMLFutureWarning) as e: | |
print("YAML problem in {}".format(fname)) | |
## actually these warnings can result in failures, so we can't do this: | |
# print(e) | |
raise e | |
except (YAMLError, YAMLStreamError) as e: | |
print("YAML problem in {}".format(fname)) | |
raise e | |
# Postprocess to make the data consistent. | |
for k in parsed: | |
name, value = k.lower(), parsed[k] | |
metadata[name] = value | |
else: | |
for i, line in enumerate(lines): | |
kv = line.split(':', 1) | |
if len(kv) == 2: | |
name, value = kv[0].lower(), kv[1].strip() | |
else: | |
break | |
if len(lines) > i+2: | |
for line in lines[i+2:]: | |
outlines.append(line) | |
return metadata, "".join(outlines) | |
def add_refstring(content): | |
if content.endswith(REFSTRING): | |
return content | |
return content + REFSTRING | |
def rm_refstring(content): | |
if content.endswith(REFSTRING): | |
return content[:(len(content)-len(REFSTRING)+1)] | |
return content | |
def write(fname, metadata, content): | |
with open(fname, 'w', encoding='utf8') as fp: | |
fp.write('---\n') | |
yaml.dump( | |
metadata, | |
fp, | |
) | |
fp.write('---\n') | |
fp.write(content) | |
async def massage_one_file(fname, client): | |
stem = str(fname.stem) | |
# print("TRYING ", fname.with_suffix('.yaml')) | |
metadata, content = read(fname) | |
zotcollpath = metadata.get("zotcollpath", stem) | |
if zotcollpath == "_index": | |
print("skipping ", fname) | |
return | |
updated = False | |
for attempt in range(MAX_ATTEMPTS): | |
try: | |
r = await client.get( | |
'http://127.0.0.1:23119/better-bibtex/' | |
'collection?/1/{}.yaml'.format( | |
zotcollpath | |
), | |
timeout=5.0 | |
) | |
if r.status_code == 200: | |
print("FOUND ", fname.with_suffix('.yaml')) | |
bibcontent = "" | |
try: | |
with open(fname.with_suffix('.yaml'), 'r', encoding='utf8') as fp: | |
bibcontent = fp.read() | |
except FileNotFoundError: | |
pass | |
# set this otherwise it will sometimes guess ISO-8859-1/ascii | |
# see `r.apparent_encoding` | |
r.encoding = 'utf-8' | |
if bibcontent != r.text: | |
with open(fname.with_suffix('.yaml'), 'w', encoding='utf8') as fp: | |
fp.write(r.text) | |
metadata['bibupdated'] = datetime.datetime.now( | |
).replace(microsecond=0).isoformat() | |
metadata["nocite"] = ["@*"] | |
metadata["bibliography"] = "{}.yaml".format(stem) | |
metadata["link-citations"] = True | |
metadata["reference-section-title"] = "References" | |
updated = True | |
newcontent = add_refstring(content) | |
if newcontent != content: | |
updated = True | |
content = newcontent | |
elif r.status_code == 404: | |
print("MISSING ", fname.with_suffix('.yaml')) | |
try: | |
fname.with_suffix('.yaml').unlink() | |
fname.with_suffix('.bib').unlink() | |
except FileNotFoundError: | |
pass | |
if "nocite" in metadata: | |
del metadata["nocite"] | |
updated = True | |
if "bibliography" in metadata: | |
del metadata["bibliography"] | |
updated = True | |
if "bibupdated" in metadata: | |
del metadata["bibupdated"] | |
updated = True | |
newcontent = rm_refstring(content) | |
if newcontent != content: | |
updated = True | |
content = newcontent | |
else: | |
r.raise_for_status() | |
## bibfile version | |
r = await client.get( | |
'http://127.0.0.1:23119/better-bibtex/' | |
'collection?/1/{}.bib'.format( | |
zotcollpath | |
), | |
timeout=5.0 | |
) | |
if r.status_code == 200: | |
print("FOUND ", fname.with_suffix('.bib')) | |
bibcontent = "" | |
try: | |
with open(fname.with_suffix('.bib'), 'r', encoding='utf8') as fp: | |
bibcontent = fp.read() | |
except FileNotFoundError: | |
pass | |
r.encoding = 'utf-8' | |
if bibcontent != r.text: | |
with open(fname.with_suffix('.bib'), 'w', encoding='utf8') as fp: | |
fp.write(r.text) | |
elif r.status_code == 404: | |
print("MISSING ", fname.with_suffix('.bib')) | |
else: | |
r.raise_for_status() | |
break | |
except httpx.HTTPError as e: | |
print("ERROR", fname, repr(e)) | |
# we write changes if something actually changed. | |
if updated: | |
print(fname, "updated") | |
write(fname, metadata, content) | |
async def main(glb0="content/**/*.Rmd", *glbs): | |
paths = list(itertools.chain.from_iterable([ | |
Path('').glob(glb) for glb in [glb0, *glbs]])) | |
shuffle(paths) | |
async with httpx.AsyncClient() as client: | |
for fname in paths: | |
await massage_one_file(fname, client) | |
if __name__ == "__main__": | |
asyncio.run(main(*sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment