Skip to content

Instantly share code, notes, and snippets.

@will-moore
Created February 23, 2023 17:57
Show Gist options
  • Save will-moore/56f03cd126dcac9981bceeb8e7cdb393 to your computer and use it in GitHub Desktop.
Save will-moore/56f03cd126dcac9981bceeb8e7cdb393 to your computer and use it in GitHub Desktop.
OMERO: Upload files in a directory to create a new Fileset to replace an existing Fileset
import argparse
import locale
import os
import platform
import sys
import omero.clients
from omero.cli import cli_login
from omero.model import ChecksumAlgorithmI
from omero.model import NamedValue
from omero.model.enums import ChecksumAlgorithmSHA1160
from omero.rtypes import rstring, rbool
from omero_version import omero_version
from omero.gateway import BlitzGateway
def get_files_for_fileset(fs_path):
filepaths = []
for path, subdirs, files in os.walk(fs_path):
for name in files:
print(os.path.join(path, name))
# If we want to ignore chunks...
# if ".z" in name or ".xml" in name:
filepaths.append(os.path.join(path, name))
return filepaths
def create_fileset(files):
"""Create a new Fileset from local files."""
fileset = omero.model.FilesetI()
for f in files:
entry = omero.model.FilesetEntryI()
entry.setClientPath(rstring(f))
fileset.addFilesetEntry(entry)
# Fill version info
system, node, release, version, machine, processor = platform.uname()
client_version_info = [
NamedValue('omero.version', omero_version),
NamedValue('os.name', system),
NamedValue('os.version', release),
NamedValue('os.architecture', machine)
]
try:
client_version_info.append(
NamedValue('locale', locale.getdefaultlocale()[0]))
except:
pass
upload = omero.model.UploadJobI()
upload.setVersionInfo(client_version_info)
fileset.linkJob(upload)
return fileset
def create_settings():
"""Create ImportSettings and set some values."""
settings = omero.grid.ImportSettings()
settings.doThumbnails = rbool(True)
settings.noStatsInfo = rbool(False)
settings.userSpecifiedTarget = None
settings.userSpecifiedName = None
settings.userSpecifiedDescription = None
settings.userSpecifiedAnnotationList = None
settings.userSpecifiedPixels = None
settings.checksumAlgorithm = ChecksumAlgorithmI()
s = rstring(ChecksumAlgorithmSHA1160)
settings.checksumAlgorithm.value = s
return settings
def upload_files(proc, files, client):
"""Upload files to OMERO from local filesystem."""
ret_val = []
for i, fobj in enumerate(files):
rfs = proc.getUploader(i)
try:
with open(fobj, 'rb') as f:
print ('Uploading: %s' % fobj)
offset = 0
block = []
rfs.write(block, offset, len(block)) # Touch
while True:
block = f.read(1000 * 1000)
if not block:
break
rfs.write(block, offset, len(block))
offset += len(block)
ret_val.append(client.sha1(fobj))
finally:
rfs.close()
return ret_val
def create_upload_fileset(conn, client, fs_path):
"""create a Fileset but don't import"""
mrepo = client.getManagedRepository()
files = get_files_for_fileset(fs_path)
assert files, 'No files found: %s' % fs_path
fileset = create_fileset(files)
settings = create_settings()
mrepo = client.getManagedRepository()
proc = mrepo.importFileset(fileset, settings)
hashes = []
try:
hashes = upload_files(proc, files, client)
finally:
proc.close()
if len(hashes) == 0:
print("No files uploaded!")
# Use first hash to get Fileset...
# first File will typically be the path/to/image.zarr/.zattrs
return get_fileset_from_hash(conn, hashes[0])
def get_fileset_from_hash(conn, hash):
params = omero.sys.ParametersI()
params.addString("hash", hash)
query = 'select fse '\
'from FilesetEntry as fse '\
'join fse.fileset as fileset '\
'join fetch fse.originalFile as ofile '\
'where ofile.hash=:hash '\
'order by fileset.id'
rows = conn.getQueryService().findAllByQuery(query, params, conn.SERVICE_OPTS)
# If this file has been imported before, multiple Filesets will be found
# return last one (most recent)
last_fse = rows[-1]
fileset_id = last_fse.fileset.id.val
orig_file = last_fse.originalFile
print('orig_file', orig_file.id.val)
print('orig_file', orig_file.getPath().val)
print('orig_file', orig_file.getName().val)
return fileset_id, orig_file
def main(argv):
parser = argparse.ArgumentParser()
parser.add_argument('--replacefileset', type=int, help=(
'Replace this Fileset...'))
parser.add_argument('path', nargs='+', help='Files or directories')
args = parser.parse_args(argv)
with cli_login() as cli:
conn = BlitzGateway(client_obj=cli._client)
for fs_path in args.path:
old_fs = None
if args.replacefileset:
old_fs = conn.getObject('Fileset', args.replacefileset)
if old_fs is None:
print ('Fileset id not found: %s' % args.replacefileset)
sys.exit(1)
print ('Importing: %s' % fs_path)
fileset_id, orig_file = create_upload_fileset(conn, cli._client, fs_path)
if old_fs is not None:
for image in old_fs.copyImages():
print("Updating image", image.name, image.id)
img = image._obj
img.fileset = omero.model.FilesetI(fileset_id, False)
conn.getUpdateService().saveObject(img, conn.SERVICE_OPTS)
# Print the HQL updates we need to update each Pixels to new Fileset file
fname = orig_file.getName().val
fpath = orig_file.getPath().val
pid = image.getPixelsId()
print(f"""psql -U postgres -d OMERO-server -c "UPDATE pixels SET name = '{fname}', path = '{fpath}' where id = {pid}""")
# delete old fileset...
print("Deleting Fileset", old_fs.id)
conn.deleteObjects("Fileset", [old_fs.id])
if __name__ == '__main__':
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment