Skip to content

Instantly share code, notes, and snippets.

@chhinze
Last active November 30, 2022 10:06
Show Gist options
  • Save chhinze/3541f1ed2131fc33642b61b7f1445055 to your computer and use it in GitHub Desktop.
Save chhinze/3541f1ed2131fc33642b61b7f1445055 to your computer and use it in GitHub Desktop.
MATLAB script to read dSPACE mat files to tabular format.
function tableData = dSpaceData2Table(matFile, names, interpolationMethod)
%dSpaceData2Table Loads a dSpace data set by names to a table.
%
% Input:
% matFile ... name of mat file to load (exported from dSpace), may be with
% relative or absolute path
% names ... (cell array, e.g. `{'a', 'b'}`). If provided, then these
% columns are read. names looks in dSpace variable names and
% logged Simulink Paths. Can also handle regex, e.g.
% names = {'^abc.*_new$'}
% will search for a name starting with `abc` and ending with
% `_new`. The given names will be converted to table names as
% well. They must be unique (distinguishable, otherwise loading
% fails.
% If no names are provided, then all fileds are tried to be
% loaded. Also here, the names must be distingushable.
% interpolationMethod method to interpolate when upsampling slower data
% rates. Any of {'linear', 'nearest', 'next', 'previous',
% 'pchip', 'cubic', 'v5cubic', 'makima', 'spline'}. Default is
% 'linear'
%
% Output:
% tableData ... loaded table with first column (`t`) being the recorded
% time.
%
% Examples:
% ```matlab
% tab1 = loadData2Table('myMatFile.mat', {'Position$'});
% plot(tab1.t, tab1.position);
%
% folder = 'C:/Users/st123456/thesis/data/practical/
% tab2 =
% loadData2Table(fullfile(folder, 'myMeasurements001.mat'), {'x_Des$',
% 'xd_Des$'});
% % ...
% ```
%
% Copyright © 2022 Christoph Hinze (MIT license)
%
% Permission is hereby granted, free of charge, to any person obtaining a copy of this
% software and associated documentation files (the “Software”), to deal in the Software
% without restriction, including without limitation the rights to use, copy, modify,
% merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
% permit persons to whom the Software is furnished to do so, subject to the following
% conditions:
%
% The above copyright notice and this permission notice shall be included in all copies
% or substantial portions of the Software.
%
% THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
% INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
% PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
% LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
% TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
% OR OTHER DEALINGS IN THE SOFTWARE.
if nargin<3 ||~any(strcmp(interpolationMethod, {'linear', 'nearest', 'next', 'previous', 'pchip', 'cubic', 'v5cubic', 'makima', 'spline'}))
interpolationMethod = 'linear';
end
fileContents = load(matFile);
structName = fieldnames(fileContents);
fileContents = fileContents.(structName{1});
%get fastetst time basis:
dt = cellfun(@(a)diff(a([1,2])), {fileContents.X(:).Data});
dt(strcmp({fileContents.X(:).Raster}, 'OnChange')) = Inf; %exclude OnChange
[~, minDtIdx] = min(dt);
minDtName = fileContents.X(minDtIdx).Raster;
t = fileContents.X(strcmp({fileContents.X.Raster}, minDtName)).Data(:);
if nargin==1 || isempty(names)
% Prozesse, die nicht ONChange geloggt werden, sind die für die
% wir uns interessieren.
idx = find(~strcmp({fileContents.Y.Raster}, 'OnChange'));
allPaths = {fileContents.Y.Path};
labelPaths = strcmp(allPaths, 'Labels'); % find labels and replace them
allPaths(labelPaths) = {fileContents.Y(labelPaths).Name};
idxPaths = allPaths(idx);
% Letzer Pfad-Teil nach "/" wird als Name verwendet. Voraussetzung:
% Eindeutig.
nn = cellfun( @(p)strsplit(p, '/'), idxPaths, 'UniformOutput', false);
names = cellfun(@(n)n{end}, nn, 'UniformOutput', false);
[~, ia] = unique(names);
multEntries = setdiff(1:length(names), ia);
for multIdx = multEntries
occurencyIdx = strcmp(names, names{multIdx});
if length(unique({fileContents.Y(idx(occurencyIdx)).Name})) == sum(occurencyIdx)
names(occurencyIdx) = {fileContents.Y(idx(occurencyIdx)).Name};
else
error('Fields with same path (%s) and name (%s) found. Cannot create table from this.', names{multIdx}, fileContents.Y(idx(occurencyIdx(1))).Name)
end
end
else
% find indices for names
if ischar(names)
names = {names};
end
names = names(:);
idx = cell(size(names));
allPaths = {fileContents.Y.Path};
allLabels = {fileContents.Y.Name};
for i = 1:numel(names)
name = names{i};
findPathResults = regexp(allPaths, name);
findLabelResults = regexp(allLabels, name);
idx_p = find(cellfun(@(s)~isempty(s), findPathResults));
idx_l = find(cellfun(@(s)~isempty(s), findLabelResults));
idx_i = union(idx_p, idx_l);
if isempty(idx_i)
error("Name '%s' not found in fields. Maybe not recorded?", name);
elseif length(idx_i) == 2 && length(idx_l) == 2 && strcmp(allLabels{idx_l(1)}, allLabels{idx_l(2)})
% case where label was accidentally recorded twice:
idx{i} = idx_i(1);
elseif length(idx_l) >1 && sum(cellfun(@(s)~isempty(s), regexp( allLabels(idx_l), [name, '.*\[\d+\]'])))>1%identify vector signals
[~,j] = sort( allLabels(idx_l(cellfun(@(s)~isempty(s), regexp( allLabels(idx_l), [name, '.*\[\d+\]'])))) );
idx{i} = idx_l(j);
elseif length(idx_i) >1
error("Name '%s' too broad. Results in match with %s", name, strjoin(strcat('"', allPaths(idx_i),'", ')));
else
idx{i} = idx_i(1);
end
end
% Herausfiltern von kürzeren Vektoren, die nicht in die Tabelle passen:
idxAllowed = find(~strcmp({fileContents.Y.Raster}, 'OnChange'));
isIdxIForbidden = ~cellfun(@isempty,cellfun(@(ii)setdiff(ii, idxAllowed), idx, 'UniformOutput', false));
if any(isIdxIForbidden)
error("Names %s are logged on change and, hence, do not have a time base. Cannot be used.",...
strjoin(strcat('"', allLabels(cell2mat({idx{isIdxIForbidden}})),'", ')));
end
names = matlab.lang.makeValidName(names,'ReplacementStyle','delete');
end
% find downsampling of other rasters:
nonStandardSamplingTimes = setdiff(unique({fileContents.X.Raster}, 'stable'), {'OnChange', minDtName}, 'stable');
periodicRasters = fileContents.X(ismember({fileContents.X.Raster}, nonStandardSamplingTimes));
downsamplingTimes = struct;
for i = 1:length(periodicRasters)
downsamplingTimes(i).Raster = periodicRasters.Raster;
downsamplingTimes(i).t = periodicRasters(i).Data;
end
%
tData = cellfun(@(i)cell2mat(arrayfun(@(j)fileContents.Y(j).Data.', i, 'UniformOutput', false)) , idx, 'UniformOutput', false);
for i = 1:length(nonStandardSamplingTimes)
% search other indices not being minDtName
nonStdTimeIdx = find(strcmp({fileContents.Y.Raster}, nonStandardSamplingTimes{i}));
idxDownsample = find(cell2mat(cellfun(@(i)any(ismember(i,nonStdTimeIdx)) ,idx ,'UniformOutput' ,false)));%intersect(idx, find(strcmp({fileContents.Y.Raster}, nonStandardSamplingTimes{i})));
%dsIdx = find(ismember(idx, idxDownsample));
for j=1:length(idxDownsample)
tData{idxDownsample(j)} = interp1( downsamplingTimes(i).t, double(tData{idxDownsample(j)}), t, interpolationMethod);
end
end
tData = [{t},tData(:)'];
tableData = table(tData{:}, 'VariableNames', [{'t'}, names(:)']);
end
from pymatreader import read_mat
import re
import pandas as pd
import numpy as np
from scipy.interpolate import interp1d
import scipy
from collections import OrderedDict
from typing import List
import matplotlib.pyplot as plt
import control
def clean_str_(s):
s = "".join(filter(lambda c: str.isidentifier(c) or str.isdecimal(c), s))
return s
def mat_get_field_names(matFile: str):
"""returns all data names in the dSpace mat file"""
data = read_mat(matFile)
keys = list(data.keys())
return data[keys[0]]["Y"]["Name"]
def mat_read_dspace(file: str, names_to_read: List[str], interp_method: str = "cubic"):
"""
file ... file name for the mat file to load (path to file)
names_to_read ... the fieldnames to read. Regex is allowed. For a list of available fields, see `mat_get_field_names(file)`
interp_method ... valid method for interp1d, e.g. 'linear', 'quadratic', 'cubic' [default] (see https://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.interp1d.html)
returns pandas.DataFrame with first column `t` being the time vector, other columns as specified in names_to_read.
"""
data = read_mat(file) # )
keys = list(data.keys())
host_time_idx = data[keys[0]]["X"]["Raster"].index("HostService")
all_times = dict(zip(data[keys[0]]["X"]["Raster"], data[keys[0]]["X"]["Data"]))
df_data = dict()
df_data["t"] = data[keys[0]]["X"]["Data"][host_time_idx]
names = data[keys[0]]["Y"]["Name"]
for name_to_read in names_to_read:
regex = re.compile(name_to_read)
idxs = [i for i, item in enumerate(names) if re.search(regex, item)]
if not idxs:
raise RuntimeError("Index " + name_to_read + " not found in dataset.")
elif len(idxs) > 1:
names_mult = [names[i] for i in idxs]
regex_isarray = re.compile(r"\[(\d+)\]$")
if all([regex_isarray.search(s) is not None for s in names_mult]):
indexed_arrays = [data[keys[0]]["Y"]["Data"][idx] for idx in idxs]
j = 0
for i in idxs:
if data[keys[0]]["Y"]["Raster"][i] != "HostService":
interp_obj = interp1d(
all_times[data[keys[0]]["Y"]["Raster"][i]],
indexed_arrays[j],
kind=interp_method,
)
indexed_arrays[j] = interp_obj(df_data["t"])
j += 1
for i, v in OrderedDict(zip(names_mult, indexed_arrays)).items():
df_data[clean_str_(i)] = v
# ordered_values_list = [v for i,v in OrderedDict(zip(names_mult, indexed_arrays)).items()]
# df_data[clean_str(name_to_read)] = np.vstack(ordered_values_list)
else:
sResults = ", ".join(names_mult)
raise RuntimeError(
"Index "
+ name_to_read
+ " too broad (found "
+ sResults
+ " in dataset)."
)
else:
if data[keys[0]]["Y"]["Raster"][idxs[0]] == "HostService":
df_data[clean_str_(name_to_read)] = data[keys[0]]["Y"]["Data"][idxs[0]]
else:
interp_obj = interp1d(
all_times[data[keys[0]]["Y"]["Raster"][idxs[0]]],
data[keys[0]]["Y"]["Data"][idxs[0]],
kind=interp_method,
)
df_data[clean_str_(name_to_read)] = interp_obj(df_data["t"])
return pd.DataFrame(data=df_data)
def plotBode(
f,
system,
unwrap=True,
gamma_sq=None,
figure=None,
label=None,
linestyle=None,
color=None,
) -> plt.Figure:
"""
Wrapper to plot bode diagrams from lti systems (control.StateSpace, control.TransferFunction,
scipy.signal.lti) as well as numeric data (e.g. from `freqresp()`).
Inputs:
f ... vector or list of frequencies
system ... system representation: lti system (control.StateSpace, control.TransferFunction,
scipy.signal.lti) or vector of complex frequency response values (same dimension as `f`).
Optional inputs:
unwrap ... [default:True] whether to unwrap the angle in the bode plot
gamma_sq ... [default:None] vector of coherence estimates od the system.
Will be plotted in third subplot
figure ... [default:None] if given, the diagram is plotted in an existing figure
label ... [default:None] a label for the legend of the plot
linestyle ... [default: None] modify the linestyle, e.g. one of {'-', ':', '-.', '--'}
color ... [default:None] modify the plot color. Should be a color known to pyplot.plot, a rgb triplet or hex string (e.g., '#c0ffee'),
see <https://matplotlib.org/stable/tutorials/colors/colors.html>
Returns a figure handle for the plot, which can be used to plot multiple bode plots in one diagram, e.g.
```python
from utils.io import plotBode
fig = plotBode(f, sys1, label="A system")
plotBode(f, sys2, label="Another system", figure=fig)
```
"""
if isinstance(
system, (control.statesp.StateSpace, control.xferfcn.TransferFunction)
):
mag, phase, _ = control.freqresp(
system, 2 * np.pi * f, squeeze=control.issiso(system)
)
phase = 180 / np.pi * phase
elif isinstance(system, scipy.signal.lti):
txy = scipy.signal.freqresp(system, 2 * np.pi * f)
mag = np.abs(txy[1])
phase = np.angle(txy[1], deg=True)
elif any(np.iscomplex(system)):
mag = np.abs(system)
phase = np.angle(system, deg=True)
else:
raise NotImplementedError(
"Only complex txy data is currently implemented as system."
)
if unwrap:
phase = np.unwrap(phase, period=360)
# Set Tex font rendering
# plt.rcParams.update({
# "text.usetex": True,
# "font.family": "sans-serif",
# "font.sans-serif": "Helvetica",
# })
if figure:
ax = figure.get_axes()
fig = figure
else:
fig, ax = plt.subplots(2 + 1*(gamma_sq is not None), 1, sharex=True)
line_style = linestyle if linestyle else "-"
color = color if color else next(ax[0]._get_lines.prop_cycler)['color'] # (use color cycler of ax1)
ax[0].semilogx(f, 20 * np.log10(mag), label=label, linestyle=line_style, color=color)
ax[0].grid(which="both")
ax[0].set_xlabel("Frequency /(rad/s)")
ax[0].set_ylabel("Magnitude /dB")
ax[0].legend(
fontsize="small",
)
ax[1].semilogx(f, phase, linestyle=line_style, color=color)
ax[1].grid(which="both")
ax[1].set_xlabel("Frequency /(rad/s)")
ax[1].set_ylabel("Phase /°")
if gamma_sq is not None:
ax[2].semilogx(f, gamma_sq, linestyle=line_style, color=color)
ax[2].grid(which="both")
ax[2].set_xlabel("Frequency /(rad/s)")
ax[2].set_ylabel("$\gamma^2 $/-")
return fig
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment