Created February 24, 2023 10:54
OBF Support
Pure Python read only support for OBF files. The OBF file format originates from the Department of NanoBiophotonics
of the Max Planck Institute for Biophysical Chemistry in Göttingen, Germany. A specification can be found at This implementation is similar
to the File and Stack API of specpy ( Can also read MSR files (the OBF part of it).
Include in your project as "import obf_support"
- Open an OBF file with "obf = obf_support.File(path_to_file)", that will read all metadata (including stack metadata)
- Access the following attributes: format_version, description, stacks
- Close with "obf.close()" (optional, is also closed automatically on deletion of the File object)
- Each Stack has attributes: format_version, name, description, shape, lengths, offsets, data_type, data
- data returns a NumPy array containing the stack data (the stack data is loaded from the file lazily, i.e. when the
attribute is accessed the first time)
Example: see
Implementation notes:
- Relies on the struct module (
- In particular see the format characters of the struct module (
- Opened issue at about
+ Constant OMAS_BF_MAX_DIMENSIONS is not explained, the value is 15.
+ Data type of OMAS_DT is not specified, it's an enum type in C++, which is stored as uint32.
- In the future maybe:
+ Writing to OBF would in principle be possible (using the struct module)
+ Read part (slice) of data (use flush points), currently data is read all at once (impractical for very large files)
+ may still crash if not all data is written in a stack (haven't seen such a stack yet)
+ if there is a problem with a stack (like unknown data type), we could simply ignore the stack and print a warning instead
Author: Jan Keller-Findeisen (, May-July 2021, MIT licensed (see
from __future__ import annotations
from collections import namedtuple
import struct
import zlib
import math
import numpy as np
# single long value
long_fmt = '<Q'
long_len = struct.calcsize(long_fmt)
long_unpack = struct.Struct(long_fmt).unpack_from
# file header = char[10], uint32, uint64, uint32
file_header_fmt = '<10sIQI'
file_header_len = struct.calcsize(file_header_fmt)
file_header_unpack = struct.Struct(file_header_fmt).unpack_from
# stack header = char[16], uint32, uint32, uint32[15], double[15], double[15], uint32, uint32, uint32, uint32, uint32, uint64, uint64, uint64
stack_header_fmt = '<16s17I30d5I3Q'
stack_header_len = struct.calcsize(stack_header_fmt)
stack_header_unpack = struct.Struct(stack_header_fmt).unpack_from
# stack footer version 1 = uint32, uint32[15], uint32[15]
stack_footer_v1_fmt = '<31I'
stack_footer_v1_len = struct.calcsize(stack_footer_v1_fmt)
stack_footer_v1_unpack = struct.Struct(stack_footer_v1_fmt).unpack_from
# stack footer version 1A = version 1 + uint32 (we only store the difference)
stack_footer_v1a_fmt = '<I'
stack_footer_v1a_len = struct.calcsize(stack_footer_v1a_fmt)
stack_footer_v1a_unpack = struct.Struct(stack_footer_v1a_fmt).unpack_from
# OBF_SI_FRACTION = int32[2] = '2i'
# OBF_SI_UNIT = OBF_SI_FRACTION[9], double = '18id'
# stack footer version 2 = version1A + OBF_SI_UNIT, OBF_SI_UNIT[15]
stack_footer_v2_fmt = '<' + '18id' * 16
stack_footer_v2_len = struct.calcsize(stack_footer_v2_fmt)
stack_footer_v2_unpack = struct.Struct(stack_footer_v2_fmt).unpack_from
# stack footer version 3 = version 2 + uint64, uint64
stack_footer_v3_fmt = '<2Q'
stack_footer_v3_len = struct.calcsize(stack_footer_v3_fmt)
stack_footer_v3_unpack = struct.Struct(stack_footer_v3_fmt).unpack_from
# stack footer version 4 = version 3 + uint64
stack_footer_v4_fmt = '<Q'
stack_footer_v4_len = struct.calcsize(stack_footer_v4_fmt)
stack_footer_v4_unpack = struct.Struct(stack_footer_v4_fmt).unpack_from
# stack footer version 5 = version 4 + uin64, uint32
stack_footer_v5_fmt = '<QI'
stack_footer_v5_len = struct.calcsize(stack_footer_v5_fmt)
stack_footer_v5_unpack = struct.Struct(stack_footer_v5_fmt).unpack_from
# stack footer version 5a = version 5 + uint64
stack_footer_v5a_fmt = '<Q'
stack_footer_v5a_len = struct.calcsize(stack_footer_v5a_fmt)
stack_footer_v5a_unpack = struct.Struct(stack_footer_v5a_fmt).unpack_from
# stack footer version 6 = version 5a + uint64, uint64
stack_footer_v6_fmt = '<2Q'
stack_footer_v6_len = struct.calcsize(stack_footer_v6_fmt)
stack_footer_v6_unpack = struct.Struct(stack_footer_v6_fmt).unpack_from
# mapping of OMAS_DT to NumPy data types (see
omas_data_types = {0x00000001: np.uint8, 0x00000002: np.int8, 0x00000004: np.uint16, 0x00000008: np.int16,
0x00000010: np.uint32, 0x00000020: np.int32, 0x00000040: np.float32, 0x00000080: np.float64,
0x00001000: np.uint64, 0x00002000: np.int64, 0x00010000: np.bool_}
# named tuple used in SIUnit
Fraction = namedtuple('Fraction', ('numerator', 'denominator'))
class File:
OBF file access.
Reads the file header, stack header and stack footer of all stacks contained in the file.
- format_version
- description
- stacks (list of Stack)
def __init__(self, file_path: str):
Create a new OBF file access object by providing a file path:
obf = File(file_path)
:param file_path: path of the OBF file
# we cannot use "with open as" because we read the data stacks content later
# open the file at the given file path
self._file = open(file_path, 'rb'), 2) # seek to the end of the file
file_size = self._file.tell()
# read the obf file header
data =
magic_header, self.format_version, first_stack_pos, description_len = file_header_unpack(data)
if magic_header != FILE_MAGIC_HEADER:
raise RuntimeError('Magic file header not found.')
# read file description
self.description = self._read_string(description_len)
# read file meta data
if self.format_version >= 2:
# read meta data position
file_meta_data_pos = long_unpack([0]
self.meta = {}
key = self._read_string()
while len(key) > 0:
value = self._read_string()
self.meta[key] = value
key = self._read_string()
# read all the stacks
next_stack_pos = first_stack_pos
self.stacks = []
while next_stack_pos != 0:
# create new stack
stack = Stack(self)
# seek to position of next stack header
# read stack header
data =
values = stack_header_unpack(data)
if values[0] != STACK_MAGIC_HEADER:
raise RuntimeError('Magic stack header not found.')
# interpret stack header
stack.format_version = values[1]
stack.rank = values[2]
stack.shape = values[3:17][:stack.rank]
stack.lengths = values[18:32][:stack.rank]
stack.offsets = values[33:47][:stack.rank]
value = values[48]
if value not in omas_data_types:
raise RuntimeError('Unsupported data type {}.'.format(value))
stack.data_type = omas_data_types[value]
stack._compression_type = values[49]
# compression_level = values[50] # relatively uninteresting, we ignore it
name_length = values[51]
description_length = values[52]
stack._data_length = values[54] # data_len_disk
next_stack_pos = values[55] = self._read_string(name_length)
stack.description = self._read_string(description_length)
stack._data_pos = self._file.tell()
footer_pos = stack._data_pos + stack._data_length
# additionally we compute a dimensionality of a stack which is the number of elements in shape minus
# trailing single value dimensions; helps to find the 2D image stacks for example
dimensionality = len(stack.shape)
while dimensionality > 1 and stack.shape[dimensionality - 1] == 1:
dimensionality -= 1
stack.dimensionality = dimensionality
# default footer
footer = {
'stack_end_used_disk': file_size,
'chunk_positions': [[0, 0]]
# read and interpret stack footer (for format version >= 1)
if stack.format_version >= 1:
# read version 1 part
data =
values = stack_footer_v1_unpack(data)
footer_length = values[0]
footer['has_col_positions'] = values[1:15][:stack.rank]
footer['has_col_labels'] = values[16:30][:stack.rank]
if stack.format_version >= 2:
# read version 1A part
data =
values = stack_footer_v1a_unpack(data)
footer['metadata_length'] = values[0]
# read version 2 part
data =
values = stack_footer_v2_unpack(data)
stack.si_value = SIUnit(values[0:19])
stack.si_dimensions = []
for i in range(stack.rank):
stack.si_dimensions.append(SIUnit(values[i * 19:(i + 1) * 19]))
if stack.format_version >= 3:
# read version 3 part
data =
values = stack_footer_v3_unpack(data)
footer['num_flush_points'] = values[0]
footer['flush_block_size'] = values[1]
if stack.format_version >= 4:
# read version 4 part
data =
values = stack_footer_v4_unpack(data)
footer['tag_dictionary_length'] = values[0]
if stack.format_version >= 5:
# read version 5 part
data =
values = stack_footer_v5_unpack(data)
footer['min_format_version'] = values[1]
if stack.format_version >= 6:
# read version 5a part
data =
values = stack_footer_v5a_unpack(data)
footer['stack_end_used_disk'] = values[0]
# read version 6 part
data =
values = stack_footer_v6_unpack(data)
footer['samples_written'] = values[0]
footer['num_chunk_positions'] = values[1]
# omit possible footer entries from later versions + footer_length)
# read label strings
stack.labels = [self._read_string() for _ in range(stack.rank)]
# read col positions
if 'has_col_positions' in footer:
stack.col_positions = {}
for axis, has_them in enumerate(footer['has_col_positions']):
if has_them:
# read doubles as positions
fmt = '<{}d'.format(stack.shape[axis])
data =
values = struct.unpack_from(fmt, data)
stack.col_positions[axis] = values
# read col labels
if 'has_col_labels' in footer:
stack.col_labels = {}
for axis, has_them in enumerate(footer['has_col_labels']):
if has_them:
# read labels
labels = []
for _ in range(stack.shape[axis]):
label = self._read_string()
stack.col_labels[axis] = labels
# read metadata
if 'metadata_length' in footer:
stack.metadata = self._read_string(footer['metadata_length'])
# read flush positions
if 'num_flush_points' in footer:
length = footer['num_flush_points']
fmt = '<{}Q'.format(length)
data =
values = struct.unpack_from(fmt, data)
footer['flush_positions'] = values
# read tag dictionary
if 'tag_dictionary_length' in footer:
stack.tag_dictionary = {}
length = footer['tag_dictionary_length']
if length > 0:
# read key, value pairs until len(key) is zero
key = self._read_string()
while len(key) > 0:
value = self._read_string()
stack.tag_dictionary[key] = value
key = self._read_string()
# read chunk positions
chunk_positions = [[0, 0]]
for _ in range(footer.get('num_chunk_positions', 0)):
fmt = '<2Q'
data =
values = struct.unpack_from(fmt, data)
footer['chunk_positions'] = chunk_positions
stack.footer = footer
# append stack to list
def find_stack_by_name(self, name_part: str) -> list[Stack]:
Small convenience method. Will return all stacks in this OBF file where string is contained in the stack name.
return [stack for stack in self.stacks if name_part in]
def close(self):
Closes the file if it isn't closed already.
if not self._file.closed:
def _read_string(self, length: int = None) -> str:
For internal use only.
:param length: Number of bytes to read, if none is given, reads the length first
:return: Decoded string
if length is None:
fmt = '<I'
data =
length = struct.unpack_from(fmt, data)[0]
fmt = '<{}s'.format(length)
data =
string = struct.unpack_from(fmt, data)[0] # unpack always returns a tuple
string = string.decode('utf-8')
except UnicodeDecodeError:
# fallback encoding for very old (<2008) files
string = string.decode('iso-8859-1')
return string
def _read_stack(self, stack: Stack):
Internal function. Reads the data array from a stack from the OBF file as a NumPy array and stores it as the
_data attribute of the stack. If called a second time, will re-read the stack.
Supporting the chunked/interleaved storage of stacks with stack format version 6, this is a bit more
elaborate and also includes a bit of heuristic estimation of the number of compressed bytes
that need to be read. The problem is that the length of the last chunk of chunked data does not seem to be
properly specified, so the total number of bytes contained in a compressed data stack is unknown, and
we need to work around it.
:param stack: A Stack object containing all metadata
# read the whole stack data (works for stack format versions <= 5)
# data =
# with chunks (works for min_format_version 6 and also below)
pos = 0
idx = 0
seek_pos = stack._data_pos
data = []
if stack._compression_type == 1 and stack.footer['samples_written'] > 0:
# if compressed and not empty: we are not completely sure about the length of the written data
if 'num_chunk_positions' in stack.footer:
# stack format version >= 6 (with chunks)
bytes_written = min(stack.footer['samples_written']*stack.data_type().itemsize+16, stack.footer['chunk_positions'][-1][0] + stack.footer['stack_end_used_disk'] - stack.footer['chunk_positions'][-1][1]) # this is a bit heuristic and not documented, but I don't want to read too much
# stack.footer['chunk_positions'][-1][0] + stack.footer['stack_end_used_disk'] - stack.footer['chunk_positions'][-1][1] is the maximal number of bytes between the beginning of the last chunk and the end of the data
# stack.footer['samples_written']*stack.data_type().itemsize+16 is the size of the uncompressed data plus a small overhead for the zip header that is also divisible by all data type sizes in bytes
# stack format version < 6 (without chunks)
bytes_written = stack._data_length
# if not compressed or empty, we know the number of bytes exactly
bytes_written = stack.footer['samples_written'] * stack.data_type().itemsize
# read (using the algorithm outlined in the format description)==
while pos < bytes_written:
bytes_to_read = bytes_written - pos
if idx < len(stack.footer['chunk_positions']):
if pos + bytes_to_read > stack.footer['chunk_positions'][idx][0]: # chunk_positions[0] = logical offset, [1] = file offset
bytes_to_read = stack.footer['chunk_positions'][idx][0] - pos
seek_pos = stack.footer['chunk_positions'][idx][1] + stack._data_pos
idx += 1
if bytes_to_read > 0:
pos += bytes_to_read
data = b"".join(data) # is there a more efficient way to concatenate byte arrays?
# if compressed, uncompress
if stack._compression_type == 1:
zobj = zlib.decompressobj()
data = zobj.decompress(data)
# data = zlib.decompress(data) # that gave "zlib.error: Error -5 while decompressing data: incomplete or truncated stream" sometimes
# convert to numpy array
array = np.frombuffer(data, dtype=stack.data_type)
array = array[:stack.footer['samples_written']] # not sure if this is needed anymore
# reshape (with reversed shape and then reverse order of dimensions)
array = np.reshape(array, stack.shape[::-1])
array = np.transpose(array)
# store
stack._data = array
def __del__(self):
Make sure that the file is closed upon deletion.
class Stack:
A Stack class, holds attributes about stacks
def __init__(self, file: File):
Initialize with a File object.
self.file = file
self._data = None
def __getattr__(self, name: str):
Computes a few convenience attributes on the fly as well as lazy loading of the data
:param name: either "pixel_sizes" or "data"
if name == 'pixel_sizes':
# if a dimension is 0, the pixel size is NaN in that direction
pixel_sizes = [length / n if n > 0 else math.nan for length, n in zip(self.lengths, self.shape)]
return pixel_sizes
elif name == 'data':
if self._data is None:
# first time data is called, load it
return self._data
class SIUnit:
OMAS_SIUNIT reimplementation
exponents = Meters (M), Kilograms (KG), Seconds (S), Amperes (A), Kelvin (K), Moles (MOL), Candela (CD), Radian (R), Steradian (SR)
unitnames = ['m', 'kg', 's', 'A', 'K', 'mol', 'CD', 'R', 'SR']
def __init__(self, values):
Initialize with 19 double values.
if len(values) != 19:
raise RuntimeError('SI unit needs 19 values to initialize.')
self.exponents = []
for i in range(9):
self.exponents.append(Fraction(values[i * 2], values[i * 2 + 1]))
self.scalefactor = values[18]
def __str__(self) -> str:
Some kind of meaningful string representation. Could still be improved.
s = '{} '.format(self.scalefactor)
for exponent, unit in zip(self.exponents, SIUnit.unitnames):
if exponent.numerator != 0:
s += '{}^({}/{})'.format(unit, *exponent)
return s
Example for the OBF file format reader implementation in Prints some useful info and shows first
XY slices of all stacks with at least two dimensions. For a documentation of the API see file itself.
import matplotlib.pyplot as plt
import as cm
import obf_support
if __name__ == '__main__':
# add a path to an ".obf" or ".msr" file here
file_path = 'path_to_obf_file'
obf = obf_support.File(file_path)
print('file: {}'.format(file_path))
print(' format version: {}'.format(obf.format_version))
print(' description: "{}"'.format(obf.description))
print(' contains {} stacks'.format(len(obf.stacks)))
for index, stack in enumerate(obf.stacks[-1:]):
print('\nstack {}'.format(index))
print(' format version: {}'.format(stack.format_version))
print(' name: "{}"'.format(
print(' description: "{}"'.format(stack.description))
print(' shape: {}'.format(stack.shape))
print(' dimensionality: {}'.format(stack.dimensionality))
print(' lengths: {}'.format(stack.lengths))
print(' pixel sizes: {}'.format(stack.pixel_sizes))
print(' offsets: {}'.format(stack.offsets))
print(' data type: {}'.format(stack.data_type.__name__))
# load stack data and show first 2D image
data =
if data.size > 0: # don't display empty stacks
fig, ax = plt.subplots()
idx = [slice(None), slice(None)] + [0] * (len(data.shape) - 2)
im = ax.imshow(data[tuple(idx)].reshape(data.shape[:2]),
