Skip to content

Instantly share code, notes, and snippets.

@flocke
Created June 14, 2016 16:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flocke/1c6adc38c7b0191245017946a891debf to your computer and use it in GitHub Desktop.
Save flocke/1c6adc38c7b0191245017946a891debf to your computer and use it in GitHub Desktop.
Very simple (and specific) python module to read a .tdms_index file produced by LabView (heavily based on npTDMS)
# -*- coding: utf-8 -*-
#
# DISCLAIMER:
# This module is very specific to our data structure, nevertheless it may provide a base for someone who wants to create
# a python module to parse their own .tdms_index files.
#
# CREDITS:
# A lot of the code re-used in this module is directly taken from npTDMS (https://github.com/adamreeve/npTDMS) and all
# credit goes to Adam Reeve as the original author.
#
########################################################################################################################
# Import the required system modules
import collections
import datetime
import numpy
import os
import os.path
import re
import struct
# Try to import pytz
try:
import pytz
except ImportError:
pytz = None
########################################################################################################################
## Definitions for reading tdms_index files, taken from the npTDMS module
tocProperties = {
'kTocMetaData': (int(1) << 1),
'kTocRawData': (int(1) << 3),
'kTocDAQmxRawData': (int(1) << 7),
'kTocInterleavedData': (int(1) << 5),
'kTocBigEndian': (int(1) << 6),
'kTocNewObjList': (int(1) << 2)
}
DataType = collections.namedtuple("DataType", ('name', 'struct', 'length', 'nptype'))
tdsDataTypes = dict(enumerate((
DataType('tdsTypeVoid', None, 0, None),
DataType('tdsTypeI8', 'b', 1, numpy.int8),
DataType('tdsTypeI16', 'h', 2, numpy.int16),
DataType('tdsTypeI32', 'l', 4, numpy.int32),
DataType('tdsTypeI64', 'q', 8, numpy.int64),
DataType('tdsTypeU8', 'B', 1, numpy.uint8),
DataType('tdsTypeU16', 'H', 2, numpy.uint16),
DataType('tdsTypeU32', 'L', 4, numpy.uint32),
DataType('tdsTypeU64', 'Q', 8, numpy.uint64),
DataType('tdsTypeSingleFloat', 'f', 4, numpy.single),
DataType('tdsTypeDoubleFloat', 'd', 8, numpy.double),
DataType('tdsTypeExtendedFloat', None, None, None),
DataType('tdsTypeDoubleFloatWithUnit', None, 8, None),
DataType('tdsTypeExtendedFloatWithUnit', None, None, None)
)))
tdsDataTypes.update({
0x19: DataType('tdsTypeSingleFloatWithUnit', None, 4, None),
0x20: DataType('tdsTypeString', None, None, None),
0x21: DataType('tdsTypeBoolean', 'b', 1, numpy.bool8),
0x44: DataType('tdsTypeTimeStamp', 'Qq', 16, None),
0xFFFFFFFF: DataType('tdsTypeDAQmxRawData', None, None, None)
})
if pytz:
# Use UTC time zone if pytz is installed
timezone = pytz.utc
else:
timezone = None
########################################################################################################################
def read_string(file):
s = file.read(4)
length = struct.unpack('<L', s)[0]
return(file.read(length).decode("utf-8"))
def read_type(file, data_type, endianness):
if data_type.name == 'tdsTypeTimeStamp':
# Time stamps are stored as number of seconds since
# 01/01/1904 00:00:00.00 UTC, ignoring leap seconds,
# and number of 2^-64 fractions of a second.
# Note that the TDMS epoch is not the Unix epoch.
s = file.read(data_type.length)
(s_frac, s) = struct.unpack('{}{}'.format(endianness, data_type.struct), s)
tdms_start = datetime.datetime(1904, 1, 1, 0, 0, 0, tzinfo=timezone)
ms = float(s_frac) * 5 ** 6 / 2 ** 58
# Adding timedelta with seconds ignores leap
# seconds, so this is correct
return(tdms_start + datetime.timedelta(seconds=s) + datetime.timedelta(microseconds=ms))
elif None not in (data_type.struct, data_type.length):
s = file.read(data_type.length)
return(struct.unpack('{}{}'.format(endianness, data_type.struct), s)[0])
else:
raise ValueError("Unsupported data type to read, {}.".format(data_type.name))
########################################################################################################################
def read_property(file):
prop_info = {}
prop_name = read_string(file)
prop_info['name'] = prop_name
s = file.read(4)
prop_dtype = tdsDataTypes[struct.unpack("<L", s)[0]]
prop_info['dtype'] = prop_dtype
if prop_dtype.name == 'tdsTypeString':
prop_value = read_string(file)
else:
prop_value = read_type(file, prop_dtype, "<")
prop_info['value'] = prop_value
return(prop_info)
def read_object(file):
object_data = {}
path = read_string(file)
object_data['path'] = path
s = file.read(4)
raw_data_index = struct.unpack("<L", s)[0]
if raw_data_index == 0xFFFFFFFF:
object_data['has_data'] = False
object_data['inherit_last_segment'] = False
elif raw_data_index == 0x00000000:
object_data['has_data'] = True
object_data['inherit_last_segment'] = True
else:
object_data['has_data'] = True
object_data['inherit_last_segment'] = False
s = file.read(4)
data_type = tdsDataTypes[struct.unpack("<L", s)[0]]
object_data['dtype'] = data_type
s = file.read(4)
dimensions = struct.unpack("<L", s)[0]
object_data['dimensions'] = dimensions
s = file.read(8)
values = struct.unpack("<Q", s)[0]
object_data['values'] = values
if data_type.name in ('tdsTypeString', ):
s = file.read(8)
data_size = struct.unpack("<Q", s)[0]
else:
data_size = values * data_type.length * dimensions
object_data['data_size'] = data_size
s = file.read(4)
num_props = struct.unpack("<L", s)[0]
object_data['num_props'] = num_props
object_data['properties'] = []
for prop in range(0, num_props):
prop_info = read_property(file)
object_data['properties'].append(prop_info)
return(object_data)
def calc_chunks(segment_data):
data_size = numpy.sum([o['data_size'] for o in segment_data['objects'] if o['has_data']])
total_data_size = segment_data['raw_data_offset'] - segment_data['next_segment_offset']
if data_size < 0 or total_data_size < 0:
raise ValueError("negative data size")
if total_data_size % data_size != 0:
raise ValueError("total data size is not a multiple of the data size")
segment_data['num_chunks'] = total_data_size // data_size
for obj in segment_data['objects']:
if obj['has_data']:
obj['values'] = obj['values'] * segment_data['num_chunks']
return(segment_data)
########################################################################################################################
def read_segment(file, offset=0, last_segment=None):
file.seek(offset)
segment_data = {}
segm_head = file.read(4).decode("utf-8")
if segm_head != "TDSh":
raise RuntimeError("Segment does not start with TDSh")
s = file.read(4)
toc_mask = struct.unpack('<i', s)[0]
segment_data['toc'] = {}
for property in tocProperties.keys():
segment_data['toc'][property] = (toc_mask & tocProperties[property]) != 0
s = file.read(4)
segment_data['version'] = struct.unpack('<i', s)[0]
s = file.read(16)
(segment_data['raw_data_offset'], segment_data['next_segment_offset']) = struct.unpack('<QQ', s)
segment_data['next_segment_pos'] = segment_data['next_segment_offset'] + offset + (7*4)
segment_data['objects'] = []
if segment_data['toc']['kTocMetaData']:
s = file.read(4)
num_obj = struct.unpack("<l", s)[0]
for o in range(0, num_obj):
object_data = read_object(file)
if object_data['inherit_last_segment'] and last_segment is not None:
for obj in last_segment['objects']:
if obj['path'] == object_data['path']:
object_data['dtype'] = obj['dtype']
object_data['dimensions'] = obj['dimensions']
object_data['values'] = obj['values']
object_data['data_size'] = obj['data_size']
segment_data['objects'].append(object_data)
segment_data = calc_chunks(segment_data)
return(segment_data)
########################################################################################################################
def read_segments(file_path):
if os.path.isfile(file_path):
file_stat = os.stat(file_path)
file_size = file_stat.st_size
file = open(file_path, 'rb')
segments = []
offset = 0
last_segment = None
while offset < file_size:
segment = read_segment(file, offset=offset, last_segment=last_segment)
segments.append(segment)
offset = segment['next_segment_pos']
last_segment = segment
file.close()
return(segments)
########################################################################################################################
def collect_objects(segments):
objects = []
if len(segments) > 0:
for s in segments:
if 'objects' in s.keys():
for o in s['objects']:
objects.append(o)
return(objects)
def filter_objects_with_data(objects):
matching = [x for x in objects if x['has_data']]
return(matching)
def filter_objects_by_channel(objects, channel):
pattern = re.compile("/[^/]+?/'{}'$".format(channel))
matching = [x for x in objects if pattern.match(x['path'])]
return(matching)
########################################################################################################################
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment