''' Test to crop all tiles in a region
import argparse
import pathlib
import urllib
import struct
import json
import sys
import ssl
import re
import os
import io
import skimage.exposure
import numpy as np
from minerva_lib import crop
# Omero API
'Cookie': os.environ['OME_COOKIE'],
ssl._create_default_https_context = ssl._create_unverified_context
''' Test to crop all tiles in a region
import argparse
import pathlib
import urllib
import struct
import botocore
import boto3
import json
import sys
import ssl
import re
import os
import skimage.exposure
import numpy as np
from minerva_lib import crop
import metadata as metadata_xml
import xml.etree.ElementTree as ET
from aws_srp import AWSSRP
s3 = boto3.resource('s3')
# Minerva API
ssl._create_default_https_context = ssl._create_unverified_context
class minerva():
def image(uuid, token, c, limit, **kwargs):
''' Load a single channel by pattern
uuid: Minerva image identifier
token: AWS Cognito Id Token
c: zero-based channel index
limit: max image pixel value
args: dict with following keys
{x, y, z, t, level}
numpy array loaded from file
def format_channel(c):
return f'{c},FFFFFF,0,1'
url = 'https://ba7xgutvbc.execute-api.'
url += ''
url += '{0}/render-tile/{x}/{y}/{z}/{t}/{l}/'.format(uuid,
url += format_channel(c)
req = urllib.request.Request(url, headers={
'Authorization': token,
'Accept': 'image/png'
with urllib.request.urlopen(req) as f:
return[:, :, 0]
except urllib.error.HTTPError as e:
print(e, file=sys.stderr)
return None
return None
def index(uuid, token):
'''Find all the file paths in a range
image_id: the id of image in minerva
token: AWS Cognito Id Token
indices: size in channels, times, LOD, Z, Y, X
tile: image tile size in pixels: y, x
limit: max image pixel value
metadata_file = 'metadata.xml'
bucket = 'minerva-test-cf-common-tilebucket-yhuku9umej1s'
url = 'https://ba7xgutvbc.execute-api.'
url += f'{uuid}'
req = urllib.request.Request(url, headers={
'Authorization': token
with urllib.request.urlopen(req) as f:
result = json.loads(
prefix = result['data']['bfu_uuid']
except urllib.error.HTTPError as e:
print(e, file=sys.stderr)
print(bucket, prefix, metadata_file)
obj = s3.Object(bucket, f'{prefix}/{metadata_file}')
root_xml = obj.get()['Body'].read().decode('utf-8')
root = ET.fromstring(root_xml)
config = metadata_xml.parse_image(root, uuid)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.", file=sys.stderr)
dtype = config['meta']['pixelsType']
tw, th = map(config['tile_size'].get,
('width', 'height'))
w, h, c, t, z = map(config['size'].get,
('width', 'height', 'c', 't', 'z'))
y = int(np.ceil(h / th))
x = int(np.ceil(w / tw))
return {
'limit': np.iinfo(getattr(np, dtype)).max,
'levels': config['levels'],
'tile_size': [th, tw],
'ctxy': [c, t, x, y],
class api():
def scaled_region(url, uuid, token):
""" Just parse the rendered_scaled_region API
url: "<matching OMERO.figure API>"
uuid: Minerva image identifier
token: AWS Cognito Id Token
Return Keywords:
iid: image id
t: integer timestep
z: integer z position in stack
max_size: maximum extent in x or y
integer [x, y]
[width, height]
chan: integer N channels by 1 index
r: float32 N channels by 2 min, max
c: float32 N channels by 3 red, green, blue
indices: size in channels, times, LOD, Z, Y, X
tile: image tile size in pixels: y, x
limit: max image pixel value
url_match ='render_scaled_region', url)
url = url[(lambda x: x.end() if x else 0)(url_match):]
if url[0] == '/':
url = url[1:]
def parse_channel(c):
cid, _min, _max, _hex = re.split('[:|$]', c)
hex_bytes = bytearray.fromhex(_hex)
return {
'min': int(_min),
'max': int(_max),
'shown': int(cid) > 0,
'cid': abs(int(cid)) - 1,
'color': struct.unpack('BBB', hex_bytes)
def parse_region(r):
return list(map(float, r.split(',')))
iid, z, t = url.split('?')[0].split('/')[:3]
query = url.split('?')[1]
parameters = {}
# Make parameters dicitonary
for param in query.split('&'):
key, value = param.split('=')
parameters[key] = value
max_size = parameters.get('max_size', 2000)
channels = parameters['c'].split(',')
region = parameters['region']
# Extract data from API
channels = list(map(parse_channel, channels))
x, y, width, height = parse_region(region)
# Reformat data from API
chans = [c for c in channels if c['shown']]
shape = np.array([width, height])
origin = np.array([x, y])
# Make API request to interpret url
meta = minerva.index(uuid, token)
def get_range(chan):
r = np.array([chan['min'], chan['max']])
return np.clip(r / meta['limit'], 0, 1)
def get_color(chan):
c = np.array(chan['color']) / 255
return np.clip(c, 0, 1)
return {
'ctxy': meta['ctxy'],
'limit': meta['limit'],
'levels': meta['levels'],
'tile_size': meta['tile_size'],
'r': np.array([get_range(c) for c in chans]),
'c': np.array([get_color(c) for c in chans]),
'chan': np.int64([c['cid'] for c in chans]),
'max_size': int(max_size),
'origin': origin,
'shape': shape,
'iid': int(iid),
't': int(t),
'z': int(z)
# Minerva API
def format_input(args):
''' Combine all parameters
id_, color_, range_ = args
return {
'channel': id_,
'color': color_,
'min': range_[0],
'max': range_[1],
def do_crop(load_tile, channels, tile_size, full_origin, full_size,
levels=1, max_size=2000):
''' Interface with minerva_lib.crop
load_tile: Function to supply 2D numpy array
channels: List of dicts of channel rendering settings
tile_size: The width, height of a single tile
full_origin: Request's full-resolution x, y origin
full_size: Request's full-resolution width, height
levels: The number of pyramid levels
max_size: The maximum response width or height
2D numpy float array of with width, height given by
`full_size` if `full_size <= max_size` or width, height given by
`full_size / 2 ** l <= max_size` for the lowest `l` meeting
`l < levels`. The array is a composite of all channels
for full or partial tiles within `full_size` from `full_origin`.
level = crop.get_optimum_pyramid_level(full_size, levels, max_size)
crop_origin = crop.scale_by_pyramid_level(full_origin, level)
crop_size = crop.scale_by_pyramid_level(full_size, level)
print(f'Cropping 1/{level} scale')
image_tiles = []
for channel in channels:
(red, green, blue) = channel['color']
_id = channel['channel']
_min = channel['min']
_max = channel['max']
for indices in crop.select_tiles(tile_size, crop_origin, crop_size):
(i, j) = indices
# Disallow negative tiles
if i < 0 or j < 0:
# Load image from Minerva
image = load_tile(_id, level, i, j)
# Disallow empty images
if image is None:
# Add to list of tiles
'min': _min,
'max': _max,
'image': image,
'indices': (i, j),
'color': (red, green, blue),
return crop.stitch_tiles(image_tiles, tile_size, crop_origin, crop_size)
# Entrypoint
def main(args):
""" Crop a region
# Read from a configuration file at a default location
cmd = argparse.ArgumentParser(
description="Crop a region"
default_url = '/548111/0/0/?c=1|0:65535$FF0000'
default_url += '&region=0,0,1024,1024'
'url', nargs='?', default=default_url,
help='OMERO.figure render_scaled_region url'
'-o', default=str(pathlib.Path.cwd()),
help='output directory'
parsed = cmd.parse_args(args)
out_file = str(pathlib.Path(parsed.o, 'out.png'))
# Set up AWS Authentication
username = os.environ['MINERVA_USERNAME']
except KeyError:
print('must have MINERVA_USERNAME in environ', file=sys.stderr)
password = os.environ['MINERVA_PASSWORD']
except KeyError:
print('must have MINERVA_PASSWORD in environ', file=sys.stderr)
minerva_pool = 'us-east-1_ecLW78ap5'
minerva_client = '7gv29ie4pak64c63frt93mv8lq'
uuid = '769cfb14-f583-4f22-9f48-94a24e09fd7f'
srp = AWSSRP(username, password, minerva_pool, minerva_client)
result = srp.authenticate_user()
token = result['AuthenticationResult']['IdToken']
# Read parameters from URL and API
keys = api.scaled_region(parsed.url, uuid, token)
# Make array of channel parameters
inputs = zip(keys['chan'], keys['c'], keys['r'])
channels = map(format_input, inputs)
# OMERO loads the tiles
def ask_minerva(c, l, i, j):
keywords = {
't': 0,
'z': 0,
'l': l,
'x': i,
'y': j
limit = keys['limit']
return minerva.image(uuid, token, c, limit, **keywords)
# Minerva does the cropping
out = do_crop(ask_minerva, channels, keys['tile_size'],
keys['origin'], keys['shape'], keys['levels'],
# Write the image buffer to a file
try:, np.uint8(255 * out))
except OSError as o_e:
if __name__ == "__main__":
class omero():
def image(c, limit, *args):
''' Load a single channel by pattern
c: zero-based channel index
limit: max image pixel value
args: tuple completing pattern
numpy array loaded from file
def format_channel(c):
api_c = c + 1
selected = '{}|0:{}$000000'.format(api_c, limit)
filler = [str(-i) for i in range(1, api_c)] + ['']
return ','.join(filler) + selected
url = ''
url += '{}/{}/{}/?m=g&format=tif&tile={},{},{}'.format(*args)
url += '&c=' + format_channel(c)
req = urllib.request.Request(url, headers=HEADERS)
with urllib.request.urlopen(req) as response:
f = io.BytesIO(
return[:, :, 0]
except urllib.error.HTTPError as e:
return None
return None
def index(image_id):
'''Find all the file paths in a range
image_id: the id of image in omero
indices: size in channels, times, LOD, Z, Y, X
tile: image tile size in pixels: y, x
limit: max image pixel value
config = {}
url = ''
url += 'imgData/{}'.format(image_id)
req = urllib.request.Request(url, headers=HEADERS)
with urllib.request.urlopen(req) as response:
config = json.loads(
dtype = config['meta']['pixelsType']
tw, th = map(config['tile_size'].get,
('width', 'height'))
w, h, c, t, z = map(config['size'].get,
('width', 'height', 'c', 't', 'z'))
y = int(np.ceil(h / th))
x = int(np.ceil(w / tw))
return {
'limit': np.iinfo(getattr(np, dtype)).max,
'levels': config['levels'],
'tile_size': [th, tw],
'ctxy': [c, t, x, y],
class api():
def scaled_region(url):
""" Just parse the rendered_scaled_region API
url: "<matching OMERO.figure API>"
Return Keywords:
iid: image id
t: integer timestep
z: integer z position in stack
max_size: maximum extent in x or y
integer [x, y]
[width, height]
chan: integer N channels by 1 index
r: float32 N channels by 2 min, max
c: float32 N channels by 3 red, green, blue
indices: size in channels, times, LOD, Z, Y, X
tile: image tile size in pixels: y, x
limit: max image pixel value
url_match ='render_scaled_region', url)
url = url[(lambda x: x.end() if x else 0)(url_match):]
if url[0] == '/':
url = url[1:]
def parse_channel(c):
cid, _min, _max, _hex = re.split('[:|$]', c)
hex_bytes = bytearray.fromhex(_hex)
return {
'min': int(_min),
'max': int(_max),
'shown': int(cid) > 0,
'cid': abs(int(cid)) - 1,
'color': struct.unpack('BBB', hex_bytes)
def parse_region(r):
return list(map(float, r.split(',')))
iid, z, t = url.split('?')[0].split('/')[:3]
query = url.split('?')[1]
parameters = {}
# Make parameters dicitonary
for param in query.split('&'):
key, value = param.split('=')
parameters[key] = value
max_size = parameters.get('max_size', 2000)
channels = parameters['c'].split(',')
region = parameters['region']
# Extract data from API
channels = list(map(parse_channel, channels))
x, y, width, height = parse_region(region)
# Reformat data from API
chans = [c for c in channels if c['shown']]
shape = np.array([width, height])
origin = np.array([x, y])
# Make API request to interpret url
meta = omero.index(iid)
def get_range(chan):
r = np.array([chan['min'], chan['max']])
return np.clip(r / meta['limit'], 0, 1)
def get_color(chan):
c = np.array(chan['color']) / 255
return np.clip(c, 0, 1)
return {
'ctxy': meta['ctxy'],
'limit': meta['limit'],
'levels': meta['levels'],
'tile_size': meta['tile_size'],
'r': np.array([get_range(c) for c in chans]),
'c': np.array([get_color(c) for c in chans]),
'chan': np.int64([c['cid'] for c in chans]),
'max_size': int(max_size),
'origin': origin,
'shape': shape,
'iid': int(iid),
't': int(t),
'z': int(z)
# Minerva API
def format_input(args):
''' Combine all parameters
id_, color_, range_ = args
return {
'channel': id_,
'color': color_,
'min': range_[0],
'max': range_[1],
def do_crop(load_tile, channels, tile_size, full_origin, full_size,
levels=1, max_size=2000):
''' Interface with minerva_lib.crop
load_tile: Function to supply 2D numpy array
channels: List of dicts of channel rendering settings
tile_size: The width, height of a single tile
full_origin: Request's full-resolution x, y origin
full_size: Request's full-resolution width, height
levels: The number of pyramid levels
max_size: The maximum response width or height
2D numpy float array of with width, height given by
`full_size` if `full_size <= max_size` or width, height given by
`full_size / 2 ** l <= max_size` for the lowest `l` meeting
`l < levels`. The array is a composite of all channels
for full or partial tiles within `full_size` from `full_origin`.
level = crop.get_optimum_pyramid_level(full_size, levels, max_size)
crop_origin = crop.scale_by_pyramid_level(full_origin, level)
crop_size = crop.scale_by_pyramid_level(full_size, level)
print(f'Cropping 1/{level} scale')
image_tiles = []
for channel in channels:
(red, green, blue) = channel['color']
_id = channel['channel']
_min = channel['min']
_max = channel['max']
for indices in crop.select_tiles(tile_size, crop_origin, crop_size):
(i, j) = indices
# Disallow negative tiles
if i < 0 or j < 0:
# Load image from Omero
image = load_tile(_id, level, i, j)
# Disallow empty images
if image is None:
# Add to list of tiles
'min': _min,
'max': _max,
'channel': _id,
'image': image,
'indices': (i, j),
'color': (red, green, blue),
return crop.stitch_tiles(image_tiles, tile_size, crop_origin, crop_size)
# Entrypoint
def main(args):
""" Crop a region
# Read from a configuration file at a default location
cmd = argparse.ArgumentParser(
description="Crop a region"
default_url = '/548111/0/0/?c=1|0:65535$FF0000,3|0:65535$0000FF'
default_url += '&region=-8160,-16416,48960,48960'
'url', nargs='?', default=default_url,
help='OMERO.figure render_scaled_region url'
'-o', default=str(pathlib.Path.cwd()),
help='output directory'
parsed = cmd.parse_args(args)
out_file = str(pathlib.Path(parsed.o, 'out.png'))
# Read parameters from URL and API
keys = api.scaled_region(parsed.url)
# Make array of channel parameters
inputs = zip(keys['chan'], keys['c'], keys['r'])
channels = map(format_input, inputs)
# OMERO loads the tiles
def ask_omero(c, l, i, j):
return omero.image(c, keys['limit'], keys['iid'],
keys['z'], keys['t'], l, i, j)
# Minerva does the cropping
out = do_crop(ask_omero, channels, keys['tile_size'],
keys['origin'], keys['shape'], keys['levels'],
# Write the image buffer to a file
try:, np.uint8(255 * out))
except OSError as o_e:
if __name__ == "__main__":
''' Create imgData result from metadata.xml
from functools import reduce
import xml.etree.ElementTree as ET
import json
XSD = {
'ome': ''
def make_channel(channel, keys):
''' Create a channel dictionary for imgData request
chan = channel.attrib
id_label = chan['ID']
emission = float(chan['EmissionWavelength'])
return {
'label': id_label,
'window': {
'min': keys['min'],
'start': keys['min'],
'end': keys['max'],
'max': keys['max'],
'emissionWave': emission,
'reverseIntensity': False,
'inverted': False,
'coefficient': 1,
'family': 'linear',
'color': 'FFFFFF',
'active': True,
def factor_pairs(count):
''' Yield factor pairs, sorted by smallest difference
for i in range(int(count ** 0.5), 0, -1):
if count % i == 0:
yield i, count // i
yield 0, 0
def make_grid(count, width, height):
''' Spatially arrange images of given shape
border = 2
gridx, gridy = next(factor_pairs(count))
return {
'gridx': gridx,
'gridy': gridy,
'border': border,
'width': border + gridx * (border + width),
'height': border + gridy * (border + height),
def make_scale(scales, i):
''' Assign i to ith division by 2
scales[str(i)] = 1.0 / (2 ** i)
return scales
def get_image(uuid):
''' Simulate database result
return {
'pyramid_levels': 1,
'uuid': uuid
def get_uuid(attributes):
''' Get the UUID from an XML attributes dictionary
return attributes['ID'].split(':').pop()
def make_meta(props):
''' Make metadata from XML attributes
img = props['Image']
pix = props['Pixels']
return {
'imageId': get_uuid(img),
'imageName': img['Name'],
'pixelsType': pix['Type'],
'imageAuthor': 'Minerva',
'projectDescription': '',
'datasetDescription': '',
'imageDescription': '',
'imageTimestamp': '',
'wellSampleId': '',
'datasetName': '',
'projectName': '',
'projectId': '',
'datasetId': '',
'wellId': '',
def make_image(image, props, channels, keys):
''' Make imgData dictionary from attriutes
pix = props['Pixels']
plane = props['Plane']
size = {
'c': int(pix['SizeC']),
't': int(pix['SizeT']),
'z': int(pix['SizeZ']),
'width': int(pix['SizeX']),
'height': int(pix['SizeY']),
levels = int(image['pyramid_levels'])
scales = reduce(make_scale, range(levels), {})
return {
'size': size,
'levels': levels,
'channels': channels,
'meta': make_meta(props),
'zoomLevelScaling': scales,
'pixel_range': [
'pixel_size': {
'x': float(pix['PhysicalSizeX']),
'y': float(pix['PhysicalSizeY']),
'z': float(pix['PhysicalSizeZ']),
'deltaT': [
'split_channel': {
'g': make_grid(size['c'], size['width'], size['height']),
'c': make_grid(size['c'] + 1, size['width'], size['height'])
'id': image['uuid'],
'tile_size': {
'width': 1024,
'height': 1024,
'init_zoom': 0,
'tiles': True,
'interpolate': True,
'perms': {
'canAnnotate': False,
'canDelete': False,
'canEdit': False,
'canLink': False,
'rdefs': {
'defaultT': 0,
'defaultZ': 0,
'model': 'color',
'invertAxis': False,
'projection': 'normal',
def parse_image(ome, image_uuid):
''' Make imgData dictionary from metadata / db
image_id = f'@ID="Image:{image_uuid}"'
e_image = ome.find(f'ome:Image[{image_id}]', XSD)
if not e_image:
return {}
e_pixels = e_image.find('ome:Pixels', XSD)
e_plane = e_pixels.find('ome:Plane', XSD)
e_channels = e_pixels.findall('ome:Channel', XSD)
image = get_image(image_uuid)
props = {
'Pixels': e_pixels.attrib,
'Plane': e_plane.attrib,
'Image': e_image.attrib,
keys = {
'min': 0,
'max': 2 ** int(props['Pixels']['SignificantBits'])
channels = [make_channel(c, keys) for c in e_channels]
return make_image(image, props, channels, keys)
def main():
''' write imgData json file
image_uuid = 'b9e36f16-75a3-4a11-be88-ed838c3b9141'
metadata_file = 'metadata.xml'
imgdata_file = 'imgdata.json'
root = ET.parse(metadata_file).getroot()
imgdata = parse_image(root, image_uuid)
with open(imgdata_file, 'w') as idf:
json.dump(imgdata, idf)
if __name__ == '__main__':
thejohnhoffer commented Jun 29, 2018


  • pip install -r requirements.txt
  • Export your Minerva username and password

The most important function

Here is what it does:

  • It calls crop.get_optimum_pyramid_level to get level
    • Using level, It calls crop.scale_by_pyramid_level to scale the origin and size
  • It gets all the tile indices from crop.select_tiles
    • It loads all the images into image_tiles
  • It stitches and composites with crop.stitch_tiles

This function is do_crop on line 215.


python [-h] [-o <OUTPUT/FOLDER>] [<URL>]

Default <URL> is:

To combine channels 0 and 2, one would use a url like this:

Default <OUTPUT/FOLDER> is the current directory:
The output is always called out.png

