Skip to content

Instantly share code, notes, and snippets.

@will-moore
Last active September 28, 2021 09:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save will-moore/d49252600b4a3dce26468012f6fcfe3b to your computer and use it in GitHub Desktop.
Save will-moore/d49252600b4a3dce26468012f6fcfe3b to your computer and use it in GitHub Desktop.
OMERO.script that loads Original Metadata (read via BioFormats) and saves this as a text file on the Image so that this becomes indexed by the search engine.
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Create File Annotations from Original Metadata.
See https://www.openmicroscopy.org/community/viewtopic.php?f=4&t=1226
"""
import omero
import omero.scripts as scripts
from omero.gateway import BlitzGateway
from omero.rtypes import rstring, rlong, wrap
from cStringIO import StringIO
NAMESPACE = "original_metadata.key_values.txt"
def create_original_file_from_file_obj(
conn, fo, path, name, file_size, mimetype=None, ns=None):
"""
Create an Original File from File Object.
This is similar to the same method from Blitz Gateway, but takes
a file object instead of a file path.
"""
raw_file_store = conn.createRawFileStore()
# create original file, set name, path, mimetype
original_file = omero.model.OriginalFileI()
original_file.setName(rstring(name))
original_file.setPath(rstring(path))
if mimetype:
original_file.mimetype = rstring(mimetype)
original_file.setSize(rlong(file_size))
# set sha1 # ONLY for OMERO-4
try:
import hashlib
hash_sha1 = hashlib.sha1
except:
import sha
hash_sha1 = sha.new
try:
fo.seek(0)
h = hash_sha1()
h.update(fo.read())
original_file.setSha1(rstring(h.hexdigest()))
except:
pass # OMERO-5 doesn't need this
upd = conn.getUpdateService()
original_file = upd.saveAndReturnObject(original_file, conn.SERVICE_OPTS)
# upload file
fo.seek(0)
raw_file_store.setFileId(original_file.getId().getValue(),
conn.SERVICE_OPTS)
buf = 10000
for pos in range(0, long(file_size), buf):
block = None
if file_size-pos < buf:
block_size = file_size-pos
else:
block_size = buf
fo.seek(pos)
block = fo.read(block_size)
raw_file_store.write(block, pos, block_size, conn.SERVICE_OPTS)
original_file = raw_file_store.save(conn.SERVICE_OPTS)
raw_file_store.close()
return original_file
def handle_image(conn, img, params) :
"""
Get metadata and create file annotation.
@param conn: The BlitzGateway connection
@param img: The ImageWrapper object
@param params: The script parameters
"""
if img.getAnnotation(NAMESPACE):
# Image has previously been processed
print(" File with namespace %s already exists" % NAMESPACE)
return
om = img.loadOriginalMetadata()
update = conn.getUpdateService()
lines = []
if om is not None:
#global_metadata : om[1]
#series_metadata : om[2]
for keyValue in (om[1] + om[2]):
if len(keyValue) > 1:
lines.append("%s = %s" % (keyValue[0], keyValue[1]))
if len(lines) == 0:
print(" No original metadata found")
return
original_metadata = "\n".join(lines)
file_size = len(original_metadata)
f = StringIO()
f.write(original_metadata)
orig_file = create_original_file_from_file_obj(
conn, f, '', "original_metadata_key_values.txt", file_size, mimetype="text/plain")
fa = omero.model.FileAnnotationI()
fa.setFile(omero.model.OriginalFileI(orig_file.id.val, False))
fa.setNs(wrap(NAMESPACE))
fa = update.saveAndReturnObject(fa, conn.SERVICE_OPTS)
# Link to Image
link = omero.model.ImageAnnotationLinkI()
link.parent = omero.model.ImageI(img.getId(), False)
link.child = omero.model.FileAnnotationI(fa.id.val, False)
update.saveAndReturnObject(link, conn.SERVICE_OPTS)
def process_images(conn, params):
"""
Parse each image for metadata.
@param conn: The BlitzGateway connection
@param params: The script parameters
"""
images = []
if params['Data_Type'] == 'Dataset':
for d in conn.getObjects("Dataset", params['IDs']):
images.extend(d.listChildren())
elif params['Data_Type'] == 'Project':
for p in conn.getObjects("Project", params['IDs']):
for d in p.listChildren():
images.extend(d.listChildren())
print("Found %s images..." % len(images))
for img in images:
print("Processing Image: %s ID: %s" % (img.getName(), img.getId()))
handle_image(conn, img, params)
return images
def runAsScript():
"""
The main entry point of the script.
Called by the client via the
scripting service, passing the required parameters.
"""
data_types = [rstring('Project'), rstring('Dataset')]
client = scripts.client('Attach_Original_Metadata.py', """\
Create a File Annotation from the Original Metadata.
""",
scripts.String(
"Data_Type", optional=False, grouping="1",
description="Choose source of images",
values=data_types, default="Dataset"),
scripts.List(
"IDs", optional=False, grouping="2",
description="List of Project or Dataset IDs").ofType(rlong(0)),
)
conn = BlitzGateway(client_obj=client)
# Process the list of args above.
params = {}
for key in client.getInputKeys():
if client.getInput(key):
params[key] = client.getInput(key, unwrap=True)
print(params)
# Call the main script - returns the number of images
results = process_images(conn, params)
msg = "Processed %s images" % len(results)
print(msg)
client.setOutput("Message", rstring(msg))
if __name__ == "__main__":
"""
Python entry point
"""
runAsScript()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment