Utilities to read and write to USID HDF5 files from HyperSpy Signal objects
import os
from warnings import warn
from functools import partial
import collections
import h5py
import numpy as np
import dask.array as da
from hyperspy.signals import BaseSignal, ComplexSignal
import pyUSID as usid
# Plugin characteristics
# ----------------------
format_name = 'USID'
description = \
'Data structured according to the Universal Spectroscopic and Imaging Data (USID) model written into ' \
'Hierarchical Data Format (HDF5) files'
full_support = False
# Recognised file extension
file_extensions = ['hdf5']
default_extension = 0
# Reading capabilities
reads_images = True
reads_spectrum = True
reads_spectrum_image = True
# Writing capabilities
writes_images = True
writes_spectrum = True
writes_spectrum_image = True
# Writing capabilities
writes = True
version = ""
# ######################## UTILITIES THAT SIMPLIFY READING FROM H5USID FILES ###########################################
def __get_dim_dict(labels, units, val_func, ignore_non_linear_dims=True):
dim_dict = dict()
for dim_name, units in zip(labels, units):
dim_vals = val_func(dim_name)
if len(dim_vals) == 1:
# Empty dimension!
step_size = np.unique(np.diff(dim_vals))
if len(step_size) > 1:
# often we end up here. In most cases,
step_avg = step_size.max()
step_size -= step_avg
var = np.mean(np.abs(step_size))
if var / step_avg < 1E-3:
step_size = [step_avg]
if ignore_non_linear_dims:
warn('Ignoring non-linearity of dimension: {}'.format(dim_name))
step_size = [1]
dim_vals[0] = 0
raise ValueError('Cannot load provided dataset. '
'Parameter: {} was varied non-linearly'.format(dim_name))
step_size = step_size[0]
dim_dict[dim_name] = {'size': len(dim_vals),
'name': dim_name,
'units': units,
'scale': step_size,
'offset': dim_vals[0]}
return dim_dict
def __assemble_dim_list(dim_dict, dim_names):
dim_list = []
for dim_name in dim_names:
except KeyError:
return dim_list
def __split_descriptor(desc):
ind = desc.rfind('(')
if ind < 0:
ind = desc.rfind('[')
if ind < 0:
return '', ''
quant = desc[:ind].strip()
units = desc[ind:]
units = units.replace('(', '')
units = units.replace(')', '')
units = units.replace('[', '')
units = units.replace(']', '')
return quant, units
def __convert_to_hs_signal(ndim_form, quantity, units, converter, dim_dict_list, spec_dim_names,
h5_path, h5_dset_path, sig_type='', verbose=False):
sig = converter(ndim_form, axes=dim_dict_list)
# Embed the quantity and units metadata before we lose them:
sig.metadata.Signal.add_dictionary({'quantity': quantity,
'units': units,
'signal_type': sig_type})
sig.metadata.General.add_dictionary({'original_filename': h5_path,
'dataset_path': h5_dset_path,
'original_file_type': 'USID HDF5',
'pyUSID_version': usid.__version__})
if verbose:
print('Signal immidiately after casting:')
if len(spec_dim_names) == 0:
if verbose:
print('No Spectroscopic dimensions - so transposing')
sig = sig.transpose()
if verbose:
print('Explicitely stating spec dims')
sig = sig.as_signal2D(spec_dim_names)
if verbose:
print('Signal after separation of dimensions:')
return sig
def usidataset_to_signal(h5_main, verbose=False, ignore_non_linear_dims=True):
Converts a single specified USIDataset object to one or more Signal objects
h5_main : pyUSID.USIDataset object
USID Main dataset
verbose : bool, Optional. Default = False
Whether or not to print debugging statements
ignore_non_linear_dims : bool, Optional
If True, parameters that were varied non-linearly in the desired dataset will result in Exceptions.
Else, all such non-linearly varied parameters will be treated as linearly varied parameters and
a Signal object will be generated.
list of hyperspy.signals.BaseSignal objects. USIDatasets with compound datatypes are broken down to multiple
Signal objects.
h5_main = usid.USIDataset(h5_main)
# TODO: Cannot handle data without N-dimensional form
# First get dictionary of axes that HyperSpy likes to see. Ignore singular dimensions
pos_dict = __get_dim_dict(h5_main.pos_dim_labels,
usid.hdf_utils.get_attr(h5_main.h5_pos_inds, 'units'),
spec_dict = __get_dim_dict(h5_main.spec_dim_labels,
usid.hdf_utils.get_attr(h5_main.h5_spec_inds, 'units'),
num_spec_dims = len(spec_dict)
num_pos_dims = len(pos_dict)
if verbose:
print(num_pos_dims, num_spec_dims)
ds_nd, success, dim_labs = usid.hdf_utils.reshape_to_n_dims(h5_main, get_labels=True)
if success != True:
raise ValueError('Dataset could not be reshaped!')
ds_nd = ds_nd.squeeze()
if verbose:
print(ds_nd.shape, dim_labs)
Normally, we might have been done but the order of the dimensions may be different in N-dim form and
attributes in ancillary dataset
num_pos_dims = len(h5_main.pos_dim_labels)
pos_dim_list = __assemble_dim_list(pos_dict, dim_labs[:num_pos_dims])
spec_dim_list = __assemble_dim_list(spec_dict, dim_labs[num_pos_dims:])
dim_list = pos_dim_list + spec_dim_list
_, is_complex, is_compound, _, _ = usid.dtype_utils.check_dtype(h5_main)
converter = BaseSignal
if is_complex:
converter = ComplexSignal
trunc_func = partial(__convert_to_hs_signal,
# Extracting the quantity and units of the main dataset
quant, units = __split_descriptor(h5_main.data_descriptor)
if is_compound:
sig = []
# Iterate over each dimension name:
for name in ds_nd.dtype.names:
q_sub, u_sub = __split_descriptor(name)
# TODO: Check to make sure that this will work with Dask.array
sig.append(trunc_func(ds_nd[name], q_sub, u_sub, sig_type=quant, verbose=verbose))
sig = [trunc_func(ds_nd, quant, units, verbose=verbose)]
return sig
# ######################## UTILITIES THAT SIMPLIFY WRITING TO H5USID FILES #############################################
def __flatten_nested_dictionary(d, parent_key='', sep='-'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(__flatten_nested_dictionary(v, new_key, sep=sep).items())
items.append((new_key, v))
return dict(items)
def __axes_list_to_dimensions(axes_list, data_shape):
dim_list = []
for dim_ind, (dim_size, dim) in enumerate(zip(data_shape, axes_list)):
dim_name = 'Unknown_Dimension_' + str(dim_ind)
if isinstance(, str):
temp =
if len(temp) > 0:
dim_name = temp
dim_units = 'a. u.'
if isinstance(dim.units, str):
temp = dim.units.strip()
if len(temp) > 0:
dim_units = temp
# use REAL dimension size rather than what is presented in the axes manager
dim_list.append(usid.Dimension(dim_name, dim_units,
dim.offset + dim_size * dim.scale,
if len(dim_list) == 0:
return usid.Dimension('Arb', 'a. u.', 1)
return dim_list[::-1]
# ######################################################################################################################
def read_all_main_datasets(filename, verbose=False, ignore_non_linear_dims=False):
Reads all USID Main datasets present in the provided HDF5 file into HyperSpy Signal objects
filename : str
path to HDF5 file
verbose : bool, Optional. Default = False
Whether or not to print debugging statements
ignore_non_linear_dims : bool, Optional
If True, parameters that were varied non-linearly in the desired dataset will result in Exceptions.
Else, all such non-linearly varied parameters will be treated as linearly varied parameters and
a Signal object will be generated.
list of hyperspy.signals.Signal object
if not isinstance(filename, str):
raise TypeError('filename should be a string')
if not os.path.isfile(filename):
raise FileNotFoundError('No file found at: {}'.format(filename))
with h5py.File(filename, mode='r') as h5_f:
all_main_dsets = usid.hdf_utils.get_all_main(h5_f)
signals = []
for dset in all_main_dsets:
signals.append(usidataset_to_signal(dset, verbose=verbose, ignore_non_linear_dims=ignore_non_linear_dims))
return signals
def file_reader(filename, path_to_main_dataset=None, verbose=False, ignore_non_linear_dims=False):
Reads a USID Main dataset present in an HDF5 file into a HyperSpy Signal
filename : str
path to HDF5 file
path_to_main_dataset : str, Optional. Default = None
Absolute path of USID Main HDF5 dataset.
If None, the very first Main Dataset will be used
verbose : bool, Optional. Default = False
Whether or not to print debugging statements
ignore_non_linear_dims : bool, Optional
If True, parameters that were varied non-linearly in the desired dataset will result in Exceptions.
Else, all such non-linearly varied parameters will be treated as linearly varied parameters and
a Signal object will be generated.
list of hyperspy.signals.Signal object
if not isinstance(filename, str):
raise TypeError('filename should be a string')
if not os.path.isfile(filename):
raise FileNotFoundError('No file found at: {}'.format(filename))
with h5py.File(filename, mode='r') as h5_f:
if path_to_main_dataset is not None:
if not isinstance(path_to_main_dataset, str):
raise TypeError('path_to_main_dataset should be a string')
h5_dset = h5_f[path_to_main_dataset]
# All other checks will be handled by helper function
all_main_dsets = usid.hdf_utils.get_all_main(h5_f)
if len(all_main_dsets) > 0:
warn('{} contains multiple USID Main datasets. {} has been selected as the desired dataset.'
'If this is not the desired dataset, please supply the path to the main dataset via'
'the "path_to_main_dataset" keyword argument'.format(h5_f, all_main_dsets[0]))
h5_dset = all_main_dsets[0]
return usidataset_to_signal(h5_dset, verbose=verbose, ignore_non_linear_dims=ignore_non_linear_dims)
def file_writer(filename, object2save):
Writes a HyperSpy Signal object to a HDF5 file formatted according to USID
filename : str
Path to target HDF5 file
object2save : hyperspy.signals.Signal
A HyperSpy signal
if not isinstance(filename, str):
raise TypeError('filename should be a string')
if os.path.exists(filename):
raise FileExistsError('A file already exists at: {}. Please delete the file at the location or specify a '
'different path for the file'.format(filename))
if not isinstance(object2save, BaseSignal):
raise TypeError('object2save should be a valid hyperspy.signals.BaseSignal object')
# Not sure how to safely ignore spurious / additional dimensions
if len(object2save.axes_manager.shape) != len(
raise ValueError('Number of dimensions in data (shape: {}) does not match number of axes: ({})'
'.'.format(, len(object2save.axes_manager.shape)))
parm_dict = __flatten_nested_dictionary(object2save.metadata.as_dictionary())
parm_dict.update(__flatten_nested_dictionary(object2save.original_metadata.as_dictionary(), parent_key='Original'))
data_2d =
if object2save.axes_manager.navigation_dimension > 0 and object2save.axes_manager.signal_dimension > 0:
data_2d = data_2d.reshape([:object2save.axes_manager.navigation_dimension]),[object2save.axes_manager.navigation_dimension:]))
pos_dims = __axes_list_to_dimensions(object2save.axes_manager.navigation_axes,[:object2save.axes_manager.navigation_dimension])
spec_dims = __axes_list_to_dimensions(object2save.axes_manager.signal_axes,[object2save.axes_manager.navigation_dimension:])
elif object2save.axes_manager.navigation_dimension == 0:
# only spectroscopic:
data_2d = data_2d.reshape(1, -1)
pos_dims = __axes_list_to_dimensions(object2save.axes_manager.navigation_axes, [])
spec_dims = __axes_list_to_dimensions(object2save.axes_manager.signal_axes,
data_2d = data_2d.reshape(-1, 1)
pos_dims = __axes_list_to_dimensions(object2save.axes_manager.navigation_axes,
spec_dims = __axes_list_to_dimensions(object2save.axes_manager.signal_axes, [])
# TODO: Does HyperSpy store the physical quantity and units somewhere?
tran = usid.NumpyTranslator()
_ = tran.translate(filename, 'Raw_Data', data_2d, 'Unknown Quantity', 'Unknown Units', pos_dims, spec_dims,
