Skip to content

Instantly share code, notes, and snippets.

@pkienzle
Last active December 13, 2023 05:39
Show Gist options
  • Save pkienzle/ea9c15362abe3f63773a to your computer and use it in GitHub Desktop.
Save pkienzle/ea9c15362abe3f63773a to your computer and use it in GitHub Desktop.
scan zotero database for missing attachments
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
import glob
import shutil
import sqlite3
from os.path import join as joinpath, expanduser, exists, isabs, realpath
QUERY_ATTACHMENTS="""
SELECT (REPLACE(itemAttachments.path, 'storage:', items.key || '/'))
FROM itemAttachments
LEFT JOIN items ON itemAttachments.itemID=items.itemID
"""
class Zotero(object):
def __init__(self, mode="r"):
self.sql = None
self.root, location = self._find_root()
self.database = joinpath(self.root, "zotero.sqlite")
self.storage = joinpath(self.root, "storage")
# TODO: lookup attachment path in zotero/profile/prefs.js
# On mac, this is:
# ~/Library/Application Support/Zotero/Profiles/$HASH.default/prefs.js
# user_pref("extensions.zotero.baseAttachmentPath", "...");
attachment_dir = os.environ.get("ZOTERO_ATTACHMENTS", "~/Documents/Papers/")
self.attachment_dir = realpath(expanduser(attachment_dir))
if not exists(self.database):
msg = ("Zotero database missing from %r." % location
+ " Set ZOTERO_HOME to path containing zotero.sqlite."
+ " See https://www.zotero.org/support/zotero_data for details.")
raise RuntimeError(msg)
self.sql = self._open_database(mode)
self.cursor = self.sql.cursor()
def close(self):
self.sql.close()
self.sql = None
def __del__(self):
if self.sql is not None:
self.close()
def _find_root(self):
env_root = os.environ.get("ZOTERO_HOME", None)
if env_root is not None:
return expanduser(env_root), "$ZOTERO_HOME"
else:
return expanduser(joinpath("~", "Zotero")), "~/Zotero"
def _open_database(self, mode):
if mode == "r":
# Copy the zotero database to tmp so that we don't interfere with
# running versions.
# TODO: use mkstemp or similar so it works on windows
dbcopy = "/tmp/zotero.sqlite"
shutil.copy(self.database, dbcopy)
return sqlite3.connect(dbcopy)
else:
return sqlite3.connect(self.database)
def attachments(self):
linked = []
stored = []
missing = []
empty = []
for rows in self.cursor.execute(QUERY_ATTACHMENTS):
# TODO: identify item by Title and Creator
# TODO: identify collection(s) containing item
path = rows[0]
if not path:
continue
if path.startswith("attachments:"):
path = joinpath(self.attachment_dir, path[12:])
#path = path.encode('latin1')
if not isabs(path):
stored.append(path)
full_path = joinpath(self.storage, path)
else:
linked.append(path)
full_path = path
if not exists(full_path):
missing.append(path)
missing = set(missing)
linked = set(linked) - set(missing)
stored = set(stored) - set(missing)
empty = set(empty)
return linked, stored, missing, empty
def main():
zot = Zotero()
linked, stored, missing, empty = zot.attachments()
zot.close()
if stored and linked:
print("Files stored in %r:\n "%zot.storage,
"\n ".join(sorted(stored)))
if missing:
print("Missing files:\n ",
"\n ".join(sorted(missing)))
if __name__ == "__main__":
main()
@verwinv
Copy link

verwinv commented Nov 7, 2019

Works like a charm, thank you @pkienzle!

@matbra
Copy link

matbra commented Feb 23, 2020

Perfect! Thanks a lot, @pkienzle!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment