Created
June 14, 2016 16:18
-
-
Save flocke/1c6adc38c7b0191245017946a891debf to your computer and use it in GitHub Desktop.
Very simple (and specific) python module to read a .tdms_index file produced by LabView (heavily based on npTDMS)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# DISCLAIMER: | |
# This module is very specific to our data structure, nevertheless it may provide a base for someone who wants to create | |
# a python module to parse their own .tdms_index files. | |
# | |
# CREDITS: | |
# A lot of the code re-used in this module is directly taken from npTDMS (https://github.com/adamreeve/npTDMS) and all | |
# credit goes to Adam Reeve as the original author. | |
# | |
######################################################################################################################## | |
# Import the required system modules | |
import collections | |
import datetime | |
import numpy | |
import os | |
import os.path | |
import re | |
import struct | |
# Try to import pytz | |
try: | |
import pytz | |
except ImportError: | |
pytz = None | |
######################################################################################################################## | |
## Definitions for reading tdms_index files, taken from the npTDMS module | |
tocProperties = { | |
'kTocMetaData': (int(1) << 1), | |
'kTocRawData': (int(1) << 3), | |
'kTocDAQmxRawData': (int(1) << 7), | |
'kTocInterleavedData': (int(1) << 5), | |
'kTocBigEndian': (int(1) << 6), | |
'kTocNewObjList': (int(1) << 2) | |
} | |
DataType = collections.namedtuple("DataType", ('name', 'struct', 'length', 'nptype')) | |
tdsDataTypes = dict(enumerate(( | |
DataType('tdsTypeVoid', None, 0, None), | |
DataType('tdsTypeI8', 'b', 1, numpy.int8), | |
DataType('tdsTypeI16', 'h', 2, numpy.int16), | |
DataType('tdsTypeI32', 'l', 4, numpy.int32), | |
DataType('tdsTypeI64', 'q', 8, numpy.int64), | |
DataType('tdsTypeU8', 'B', 1, numpy.uint8), | |
DataType('tdsTypeU16', 'H', 2, numpy.uint16), | |
DataType('tdsTypeU32', 'L', 4, numpy.uint32), | |
DataType('tdsTypeU64', 'Q', 8, numpy.uint64), | |
DataType('tdsTypeSingleFloat', 'f', 4, numpy.single), | |
DataType('tdsTypeDoubleFloat', 'd', 8, numpy.double), | |
DataType('tdsTypeExtendedFloat', None, None, None), | |
DataType('tdsTypeDoubleFloatWithUnit', None, 8, None), | |
DataType('tdsTypeExtendedFloatWithUnit', None, None, None) | |
))) | |
tdsDataTypes.update({ | |
0x19: DataType('tdsTypeSingleFloatWithUnit', None, 4, None), | |
0x20: DataType('tdsTypeString', None, None, None), | |
0x21: DataType('tdsTypeBoolean', 'b', 1, numpy.bool8), | |
0x44: DataType('tdsTypeTimeStamp', 'Qq', 16, None), | |
0xFFFFFFFF: DataType('tdsTypeDAQmxRawData', None, None, None) | |
}) | |
if pytz: | |
# Use UTC time zone if pytz is installed | |
timezone = pytz.utc | |
else: | |
timezone = None | |
######################################################################################################################## | |
def read_string(file): | |
s = file.read(4) | |
length = struct.unpack('<L', s)[0] | |
return(file.read(length).decode("utf-8")) | |
def read_type(file, data_type, endianness): | |
if data_type.name == 'tdsTypeTimeStamp': | |
# Time stamps are stored as number of seconds since | |
# 01/01/1904 00:00:00.00 UTC, ignoring leap seconds, | |
# and number of 2^-64 fractions of a second. | |
# Note that the TDMS epoch is not the Unix epoch. | |
s = file.read(data_type.length) | |
(s_frac, s) = struct.unpack('{}{}'.format(endianness, data_type.struct), s) | |
tdms_start = datetime.datetime(1904, 1, 1, 0, 0, 0, tzinfo=timezone) | |
ms = float(s_frac) * 5 ** 6 / 2 ** 58 | |
# Adding timedelta with seconds ignores leap | |
# seconds, so this is correct | |
return(tdms_start + datetime.timedelta(seconds=s) + datetime.timedelta(microseconds=ms)) | |
elif None not in (data_type.struct, data_type.length): | |
s = file.read(data_type.length) | |
return(struct.unpack('{}{}'.format(endianness, data_type.struct), s)[0]) | |
else: | |
raise ValueError("Unsupported data type to read, {}.".format(data_type.name)) | |
######################################################################################################################## | |
def read_property(file): | |
prop_info = {} | |
prop_name = read_string(file) | |
prop_info['name'] = prop_name | |
s = file.read(4) | |
prop_dtype = tdsDataTypes[struct.unpack("<L", s)[0]] | |
prop_info['dtype'] = prop_dtype | |
if prop_dtype.name == 'tdsTypeString': | |
prop_value = read_string(file) | |
else: | |
prop_value = read_type(file, prop_dtype, "<") | |
prop_info['value'] = prop_value | |
return(prop_info) | |
def read_object(file): | |
object_data = {} | |
path = read_string(file) | |
object_data['path'] = path | |
s = file.read(4) | |
raw_data_index = struct.unpack("<L", s)[0] | |
if raw_data_index == 0xFFFFFFFF: | |
object_data['has_data'] = False | |
object_data['inherit_last_segment'] = False | |
elif raw_data_index == 0x00000000: | |
object_data['has_data'] = True | |
object_data['inherit_last_segment'] = True | |
else: | |
object_data['has_data'] = True | |
object_data['inherit_last_segment'] = False | |
s = file.read(4) | |
data_type = tdsDataTypes[struct.unpack("<L", s)[0]] | |
object_data['dtype'] = data_type | |
s = file.read(4) | |
dimensions = struct.unpack("<L", s)[0] | |
object_data['dimensions'] = dimensions | |
s = file.read(8) | |
values = struct.unpack("<Q", s)[0] | |
object_data['values'] = values | |
if data_type.name in ('tdsTypeString', ): | |
s = file.read(8) | |
data_size = struct.unpack("<Q", s)[0] | |
else: | |
data_size = values * data_type.length * dimensions | |
object_data['data_size'] = data_size | |
s = file.read(4) | |
num_props = struct.unpack("<L", s)[0] | |
object_data['num_props'] = num_props | |
object_data['properties'] = [] | |
for prop in range(0, num_props): | |
prop_info = read_property(file) | |
object_data['properties'].append(prop_info) | |
return(object_data) | |
def calc_chunks(segment_data): | |
data_size = numpy.sum([o['data_size'] for o in segment_data['objects'] if o['has_data']]) | |
total_data_size = segment_data['raw_data_offset'] - segment_data['next_segment_offset'] | |
if data_size < 0 or total_data_size < 0: | |
raise ValueError("negative data size") | |
if total_data_size % data_size != 0: | |
raise ValueError("total data size is not a multiple of the data size") | |
segment_data['num_chunks'] = total_data_size // data_size | |
for obj in segment_data['objects']: | |
if obj['has_data']: | |
obj['values'] = obj['values'] * segment_data['num_chunks'] | |
return(segment_data) | |
######################################################################################################################## | |
def read_segment(file, offset=0, last_segment=None): | |
file.seek(offset) | |
segment_data = {} | |
segm_head = file.read(4).decode("utf-8") | |
if segm_head != "TDSh": | |
raise RuntimeError("Segment does not start with TDSh") | |
s = file.read(4) | |
toc_mask = struct.unpack('<i', s)[0] | |
segment_data['toc'] = {} | |
for property in tocProperties.keys(): | |
segment_data['toc'][property] = (toc_mask & tocProperties[property]) != 0 | |
s = file.read(4) | |
segment_data['version'] = struct.unpack('<i', s)[0] | |
s = file.read(16) | |
(segment_data['raw_data_offset'], segment_data['next_segment_offset']) = struct.unpack('<QQ', s) | |
segment_data['next_segment_pos'] = segment_data['next_segment_offset'] + offset + (7*4) | |
segment_data['objects'] = [] | |
if segment_data['toc']['kTocMetaData']: | |
s = file.read(4) | |
num_obj = struct.unpack("<l", s)[0] | |
for o in range(0, num_obj): | |
object_data = read_object(file) | |
if object_data['inherit_last_segment'] and last_segment is not None: | |
for obj in last_segment['objects']: | |
if obj['path'] == object_data['path']: | |
object_data['dtype'] = obj['dtype'] | |
object_data['dimensions'] = obj['dimensions'] | |
object_data['values'] = obj['values'] | |
object_data['data_size'] = obj['data_size'] | |
segment_data['objects'].append(object_data) | |
segment_data = calc_chunks(segment_data) | |
return(segment_data) | |
######################################################################################################################## | |
def read_segments(file_path): | |
if os.path.isfile(file_path): | |
file_stat = os.stat(file_path) | |
file_size = file_stat.st_size | |
file = open(file_path, 'rb') | |
segments = [] | |
offset = 0 | |
last_segment = None | |
while offset < file_size: | |
segment = read_segment(file, offset=offset, last_segment=last_segment) | |
segments.append(segment) | |
offset = segment['next_segment_pos'] | |
last_segment = segment | |
file.close() | |
return(segments) | |
######################################################################################################################## | |
def collect_objects(segments): | |
objects = [] | |
if len(segments) > 0: | |
for s in segments: | |
if 'objects' in s.keys(): | |
for o in s['objects']: | |
objects.append(o) | |
return(objects) | |
def filter_objects_with_data(objects): | |
matching = [x for x in objects if x['has_data']] | |
return(matching) | |
def filter_objects_by_channel(objects, channel): | |
pattern = re.compile("/[^/]+?/'{}'$".format(channel)) | |
matching = [x for x in objects if pattern.match(x['path'])] | |
return(matching) | |
######################################################################################################################## |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment