Created
April 29, 2019 20:59
-
-
Save ssomnath/097d2bb7c1468709ce2899512deb5b6d to your computer and use it in GitHub Desktop.
Utilities to read and write to USID HDF5 files from HyperSpy Signal objects
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from warnings import warn | |
from functools import partial | |
import collections | |
import h5py | |
import numpy as np | |
import dask.array as da | |
from hyperspy.signals import BaseSignal, ComplexSignal | |
import pyUSID as usid | |
# Plugin characteristics | |
# ---------------------- | |
format_name = 'USID' | |
description = \ | |
'Data structured according to the Universal Spectroscopic and Imaging Data (USID) model written into ' \ | |
'Hierarchical Data Format (HDF5) files' | |
full_support = False | |
# Recognised file extension | |
file_extensions = ['hdf5'] | |
default_extension = 0 | |
# Reading capabilities | |
reads_images = True | |
reads_spectrum = True | |
reads_spectrum_image = True | |
# Writing capabilities | |
writes_images = True | |
writes_spectrum = True | |
writes_spectrum_image = True | |
# Writing capabilities | |
writes = True | |
version = "0.0.5.1" | |
# ######################## UTILITIES THAT SIMPLIFY READING FROM H5USID FILES ########################################### | |
def __get_dim_dict(labels, units, val_func, ignore_non_linear_dims=True): | |
dim_dict = dict() | |
for dim_name, units in zip(labels, units): | |
dim_vals = val_func(dim_name) | |
if len(dim_vals) == 1: | |
# Empty dimension! | |
continue | |
else: | |
step_size = np.unique(np.diff(dim_vals)) | |
if len(step_size) > 1: | |
# often we end up here. In most cases, | |
step_avg = step_size.max() | |
step_size -= step_avg | |
var = np.mean(np.abs(step_size)) | |
if var / step_avg < 1E-3: | |
step_size = [step_avg] | |
else: | |
if ignore_non_linear_dims: | |
warn('Ignoring non-linearity of dimension: {}'.format(dim_name)) | |
step_size = [1] | |
dim_vals[0] = 0 | |
else: | |
raise ValueError('Cannot load provided dataset. ' | |
'Parameter: {} was varied non-linearly'.format(dim_name)) | |
step_size = step_size[0] | |
dim_dict[dim_name] = {'size': len(dim_vals), | |
'name': dim_name, | |
'units': units, | |
'scale': step_size, | |
'offset': dim_vals[0]} | |
return dim_dict | |
def __assemble_dim_list(dim_dict, dim_names): | |
dim_list = [] | |
for dim_name in dim_names: | |
try: | |
dim_list.append(dim_dict[dim_name]) | |
except KeyError: | |
pass | |
return dim_list | |
def __split_descriptor(desc): | |
ind = desc.rfind('(') | |
if ind < 0: | |
ind = desc.rfind('[') | |
if ind < 0: | |
return '', '' | |
quant = desc[:ind].strip() | |
units = desc[ind:] | |
units = units.replace('(', '') | |
units = units.replace(')', '') | |
units = units.replace('[', '') | |
units = units.replace(']', '') | |
return quant, units | |
def __convert_to_hs_signal(ndim_form, quantity, units, converter, dim_dict_list, spec_dim_names, | |
h5_path, h5_dset_path, sig_type='', verbose=False): | |
sig = converter(ndim_form, axes=dim_dict_list) | |
# Embed the quantity and units metadata before we lose them: | |
sig.metadata.Signal.add_dictionary({'quantity': quantity, | |
'units': units, | |
'signal_type': sig_type}) | |
sig.metadata.General.add_dictionary({'original_filename': h5_path, | |
'dataset_path': h5_dset_path, | |
'original_file_type': 'USID HDF5', | |
'pyUSID_version': usid.__version__}) | |
if verbose: | |
print('Signal immidiately after casting:') | |
print(sig) | |
print(sig.axes_manager) | |
if len(spec_dim_names) == 0: | |
if verbose: | |
print('No Spectroscopic dimensions - so transposing') | |
sig = sig.transpose() | |
else: | |
if verbose: | |
print('Explicitely stating spec dims') | |
sig = sig.as_signal2D(spec_dim_names) | |
if verbose: | |
print('Signal after separation of dimensions:') | |
print(sig) | |
print(sig.axes_manager) | |
return sig | |
def usidataset_to_signal(h5_main, verbose=False, ignore_non_linear_dims=True): | |
""" | |
Converts a single specified USIDataset object to one or more Signal objects | |
Parameters | |
---------- | |
h5_main : pyUSID.USIDataset object | |
USID Main dataset | |
verbose : bool, Optional. Default = False | |
Whether or not to print debugging statements | |
ignore_non_linear_dims : bool, Optional | |
If True, parameters that were varied non-linearly in the desired dataset will result in Exceptions. | |
Else, all such non-linearly varied parameters will be treated as linearly varied parameters and | |
a Signal object will be generated. | |
Returns | |
------- | |
list of hyperspy.signals.BaseSignal objects. USIDatasets with compound datatypes are broken down to multiple | |
Signal objects. | |
""" | |
h5_main = usid.USIDataset(h5_main) | |
# TODO: Cannot handle data without N-dimensional form | |
# First get dictionary of axes that HyperSpy likes to see. Ignore singular dimensions | |
pos_dict = __get_dim_dict(h5_main.pos_dim_labels, | |
usid.hdf_utils.get_attr(h5_main.h5_pos_inds, 'units'), | |
h5_main.get_pos_values, | |
ignore_non_linear_dims=ignore_non_linear_dims) | |
spec_dict = __get_dim_dict(h5_main.spec_dim_labels, | |
usid.hdf_utils.get_attr(h5_main.h5_spec_inds, 'units'), | |
h5_main.get_spec_values, | |
ignore_non_linear_dims=ignore_non_linear_dims) | |
num_spec_dims = len(spec_dict) | |
num_pos_dims = len(pos_dict) | |
if verbose: | |
print(num_pos_dims, num_spec_dims) | |
ds_nd, success, dim_labs = usid.hdf_utils.reshape_to_n_dims(h5_main, get_labels=True) | |
if success != True: | |
raise ValueError('Dataset could not be reshaped!') | |
ds_nd = ds_nd.squeeze() | |
if verbose: | |
print(ds_nd.shape, dim_labs) | |
""" | |
Normally, we might have been done but the order of the dimensions may be different in N-dim form and | |
attributes in ancillary dataset | |
""" | |
num_pos_dims = len(h5_main.pos_dim_labels) | |
pos_dim_list = __assemble_dim_list(pos_dict, dim_labs[:num_pos_dims]) | |
spec_dim_list = __assemble_dim_list(spec_dict, dim_labs[num_pos_dims:]) | |
dim_list = pos_dim_list + spec_dim_list | |
_, is_complex, is_compound, _, _ = usid.dtype_utils.check_dtype(h5_main) | |
converter = BaseSignal | |
if is_complex: | |
converter = ComplexSignal | |
trunc_func = partial(__convert_to_hs_signal, | |
converter=converter, | |
dim_dict_list=dim_list, | |
spec_dim_names=list(spec_dict.keys()), | |
h5_path=h5_main.file.filename, | |
h5_dset_path=h5_main.name) | |
# Extracting the quantity and units of the main dataset | |
quant, units = __split_descriptor(h5_main.data_descriptor) | |
if is_compound: | |
sig = [] | |
# Iterate over each dimension name: | |
for name in ds_nd.dtype.names: | |
q_sub, u_sub = __split_descriptor(name) | |
# TODO: Check to make sure that this will work with Dask.array | |
sig.append(trunc_func(ds_nd[name], q_sub, u_sub, sig_type=quant, verbose=verbose)) | |
else: | |
sig = [trunc_func(ds_nd, quant, units, verbose=verbose)] | |
return sig | |
# ######################## UTILITIES THAT SIMPLIFY WRITING TO H5USID FILES ############################################# | |
def __flatten_nested_dictionary(d, parent_key='', sep='-'): | |
# https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys | |
items = [] | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, collections.MutableMapping): | |
items.extend(__flatten_nested_dictionary(v, new_key, sep=sep).items()) | |
else: | |
items.append((new_key, v)) | |
return dict(items) | |
def __axes_list_to_dimensions(axes_list, data_shape): | |
dim_list = [] | |
for dim_ind, (dim_size, dim) in enumerate(zip(data_shape, axes_list)): | |
dim_name = 'Unknown_Dimension_' + str(dim_ind) | |
if isinstance(dim.name, str): | |
temp = dim.name.strip() | |
if len(temp) > 0: | |
dim_name = temp | |
dim_units = 'a. u.' | |
if isinstance(dim.units, str): | |
temp = dim.units.strip() | |
if len(temp) > 0: | |
dim_units = temp | |
# use REAL dimension size rather than what is presented in the axes manager | |
dim_list.append(usid.Dimension(dim_name, dim_units, | |
np.arange(dim.offset, | |
dim.offset + dim_size * dim.scale, | |
dim.scale))) | |
if len(dim_list) == 0: | |
return usid.Dimension('Arb', 'a. u.', 1) | |
return dim_list[::-1] | |
# ###################################################################################################################### | |
def read_all_main_datasets(filename, verbose=False, ignore_non_linear_dims=False): | |
""" | |
Reads all USID Main datasets present in the provided HDF5 file into HyperSpy Signal objects | |
Parameters | |
---------- | |
filename : str | |
path to HDF5 file | |
verbose : bool, Optional. Default = False | |
Whether or not to print debugging statements | |
ignore_non_linear_dims : bool, Optional | |
If True, parameters that were varied non-linearly in the desired dataset will result in Exceptions. | |
Else, all such non-linearly varied parameters will be treated as linearly varied parameters and | |
a Signal object will be generated. | |
Returns | |
------- | |
list of hyperspy.signals.Signal object | |
""" | |
if not isinstance(filename, str): | |
raise TypeError('filename should be a string') | |
if not os.path.isfile(filename): | |
raise FileNotFoundError('No file found at: {}'.format(filename)) | |
with h5py.File(filename, mode='r') as h5_f: | |
all_main_dsets = usid.hdf_utils.get_all_main(h5_f) | |
signals = [] | |
for dset in all_main_dsets: | |
signals.append(usidataset_to_signal(dset, verbose=verbose, ignore_non_linear_dims=ignore_non_linear_dims)) | |
return signals | |
def file_reader(filename, path_to_main_dataset=None, verbose=False, ignore_non_linear_dims=False): | |
""" | |
Reads a USID Main dataset present in an HDF5 file into a HyperSpy Signal | |
Parameters | |
---------- | |
filename : str | |
path to HDF5 file | |
path_to_main_dataset : str, Optional. Default = None | |
Absolute path of USID Main HDF5 dataset. | |
If None, the very first Main Dataset will be used | |
verbose : bool, Optional. Default = False | |
Whether or not to print debugging statements | |
ignore_non_linear_dims : bool, Optional | |
If True, parameters that were varied non-linearly in the desired dataset will result in Exceptions. | |
Else, all such non-linearly varied parameters will be treated as linearly varied parameters and | |
a Signal object will be generated. | |
Returns | |
------- | |
list of hyperspy.signals.Signal object | |
""" | |
if not isinstance(filename, str): | |
raise TypeError('filename should be a string') | |
if not os.path.isfile(filename): | |
raise FileNotFoundError('No file found at: {}'.format(filename)) | |
with h5py.File(filename, mode='r') as h5_f: | |
if path_to_main_dataset is not None: | |
if not isinstance(path_to_main_dataset, str): | |
raise TypeError('path_to_main_dataset should be a string') | |
h5_dset = h5_f[path_to_main_dataset] | |
# All other checks will be handled by helper function | |
else: | |
all_main_dsets = usid.hdf_utils.get_all_main(h5_f) | |
if len(all_main_dsets) > 0: | |
warn('{} contains multiple USID Main datasets. {} has been selected as the desired dataset.' | |
'If this is not the desired dataset, please supply the path to the main dataset via' | |
'the "path_to_main_dataset" keyword argument'.format(h5_f, all_main_dsets[0])) | |
h5_dset = all_main_dsets[0] | |
return usidataset_to_signal(h5_dset, verbose=verbose, ignore_non_linear_dims=ignore_non_linear_dims) | |
def file_writer(filename, object2save): | |
""" | |
Writes a HyperSpy Signal object to a HDF5 file formatted according to USID | |
Parameters | |
---------- | |
filename : str | |
Path to target HDF5 file | |
object2save : hyperspy.signals.Signal | |
A HyperSpy signal | |
""" | |
if not isinstance(filename, str): | |
raise TypeError('filename should be a string') | |
if os.path.exists(filename): | |
raise FileExistsError('A file already exists at: {}. Please delete the file at the location or specify a ' | |
'different path for the file'.format(filename)) | |
if not isinstance(object2save, BaseSignal): | |
raise TypeError('object2save should be a valid hyperspy.signals.BaseSignal object') | |
# Not sure how to safely ignore spurious / additional dimensions | |
if len(object2save.axes_manager.shape) != len(object2save.data.shape): | |
raise ValueError('Number of dimensions in data (shape: {}) does not match number of axes: ({})' | |
'.'.format(object2save.data.shape, len(object2save.axes_manager.shape))) | |
parm_dict = __flatten_nested_dictionary(object2save.metadata.as_dictionary()) | |
parm_dict.update(__flatten_nested_dictionary(object2save.original_metadata.as_dictionary(), parent_key='Original')) | |
data_2d = object2save.data | |
if object2save.axes_manager.navigation_dimension > 0 and object2save.axes_manager.signal_dimension > 0: | |
data_2d = data_2d.reshape(np.prod(object2save.data.shape[:object2save.axes_manager.navigation_dimension]), | |
np.prod(object2save.data.shape[object2save.axes_manager.navigation_dimension:])) | |
pos_dims = __axes_list_to_dimensions(object2save.axes_manager.navigation_axes, | |
object2save.data.shape[:object2save.axes_manager.navigation_dimension]) | |
spec_dims = __axes_list_to_dimensions(object2save.axes_manager.signal_axes, | |
object2save.data.shape[object2save.axes_manager.navigation_dimension:]) | |
elif object2save.axes_manager.navigation_dimension == 0: | |
# only spectroscopic: | |
data_2d = data_2d.reshape(1, -1) | |
pos_dims = __axes_list_to_dimensions(object2save.axes_manager.navigation_axes, []) | |
spec_dims = __axes_list_to_dimensions(object2save.axes_manager.signal_axes, object2save.data.shape) | |
else: | |
data_2d = data_2d.reshape(-1, 1) | |
pos_dims = __axes_list_to_dimensions(object2save.axes_manager.navigation_axes, object2save.data.shape) | |
spec_dims = __axes_list_to_dimensions(object2save.axes_manager.signal_axes, []) | |
# TODO: Does HyperSpy store the physical quantity and units somewhere? | |
tran = usid.NumpyTranslator() | |
_ = tran.translate(filename, 'Raw_Data', data_2d, 'Unknown Quantity', 'Unknown Units', pos_dims, spec_dims, | |
parm_dict=parm_dict) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment