Skip to content

Instantly share code, notes, and snippets.

@fschulze
Created May 23, 2023 14:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fschulze/157070eefc05292bcfb8beb1cd4084b2 to your computer and use it in GitHub Desktop.
Save fschulze/157070eefc05292bcfb8beb1cd4084b2 to your computer and use it in GitHub Desktop.
from devpi_server.filestore import FileEntry
from devpi_server.main import get_pluginmanager
from devpi_server.main import _main
from pluggy import HookimplMarker
import sys
hookimpl = HookimplMarker("devpiserver")
def main():
pm = get_pluginmanager()
pm.register(Plugin())
_main(pm, sys.argv)
class Plugin:
@hookimpl
def devpiserver_cmdline_run(self, xom):
users = {}
stages = {}
def get_user_stage(username, index):
user = users.get(username)
if user is None:
user = users[username] = xom.model.get_user(username)
stage = stages.get(index)
if stage is None:
stage = stages[index] = user.getstage(index)
return (user, stage)
log = xom.log
keyfs = xom.keyfs
keyfs.restart_as_write_transaction()
tx = keyfs.tx
log.info("Checking at serial %s" % tx.at_serial)
keys = (keyfs.get_key('STAGEFILE'),)
relpaths = tx.iter_relpaths_at(keys, tx.at_serial)
for item in relpaths:
if item.value is None:
continue
key = keyfs.get_key_instance(item.keyname, item.relpath)
entry = FileEntry(key, item.value)
if not entry.last_modified:
continue
(user, stage) = get_user_stage(entry.user, entry.index)
if stage.ixconfig["type"] == "mirror":
continue
linkstore = stage.get_linkstore_perstage(
entry.project, entry.version, readonly=False)
(link,) = linkstore.get_links(entrypath=entry.relpath)
if link.rel != 'toxresult':
continue
checksum = entry.file_get_checksum(entry.hash_type)
if entry.hash_value != checksum:
hash_spec = f"{entry.hash_type}={checksum}"
log.info(f"New hash_spec {hash_spec!r} for {entry.relpath}")
with entry.key.update() as meta:
meta["hash_spec"] = hash_spec
link.linkdict["hash_spec"] = hash_spec
stage.key_projversion(entry.project, entry.version).set(linkstore.verdata)
return 0
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment