Skip to content

Instantly share code, notes, and snippets.

@neurodroid
Last active June 17, 2018 18:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save neurodroid/213f08570654de3a5c3b3e6e0a27fa27 to your computer and use it in GitHub Desktop.
Save neurodroid/213f08570654de3a5c3b3e6e0a27fa27 to your computer and use it in GitHub Desktop.
Reads Intan Technologies CLAMP data file generated by controller GUI.
"""
Reads Intan Technologies CLAMP data file generated by controller GUI.
Christoph Schmidt-Hieber
2016-11-05
Mostly a Python version of read_Intan_CLP_file.m from
http://www.intantech.com/files/Intan_CLAMP_software_compiled_v1_0.zip
as of 2016-11-05
Example:
>>> import matplotlib.pyplot as plt
>>> import intan
>>> intan_file = intan.IntanFile('myexperiment_A_160916_142731.clp')
>>> plt.plot(intan_file.data["Time"], intan_file.data["Measured"])
>>> intan_file = intan.IntanFile('myexperiment_AUX_160916_142731.clp')
>>> plt.plot(intan_file.data["Time"], intan_file.data["ADC"][1])
"""
import sys
import numpy as np
class IntanFile(object):
"""
Intan Technologies CLAMP data file generated by controller GUI.
Attributes
----------
data : dict
Dictionary containing the file data.
header : dict
Dictionary containing the file metadata.
"""
def __init__(self, filename, read=True, verbose=0):
"""
Open a file for reading.
Parameters
----------
filename : str
File name (full path)
read : bool, optional
Read in file data upon initialization. Default: True
verbose : int, optional
Print information while reading. Default: 0
"""
self.filename = filename
if read:
self.read(verbose=verbose)
def read(self, verbose=0):
"""
Read file.
Parameters
----------
verbose : int, optional
Print information while reading. Default: 0
"""
with open(self.filename, 'rb') as self.fid:
self._read_header(verbose)
if self.header["datatype"] == 0:
self._read_data()
else:
self._read_aux_data()
self.fid.close()
def _read_segment(self):
segment = {}
segment["WaveformNumber"] = np.fromfile(self.fid, np.uint8, count=1)[0]
segment["TOffset"] = np.fromfile(self.fid, np.uint32, count=1)[0] + 1
segment["Start"] = np.fromfile(self.fid, np.uint32, count=1)[0] + 1
segment["End"] = np.fromfile(self.fid, np.uint32, count=1)[0] + 1
segment["AppliedValue"] = np.fromfile(self.fid, np.float32, count=1)[0]
return segment
def _read_waveform(self):
waveform = {}
waveform["Interval"] = np.fromfile(self.fid, np.float32, count=1)[0]
numSegments = np.fromfile(self.fid, np.uint16, count=1)[0]
waveform["Segments"] = [
self._read_segment()
for i in range(numSegments)]
return waveform
def _read_header_voltage_clamp_settings(self):
return {
"HoldingVoltage": np.fromfile(self.fid, np.float32, count=1)[0],
"NominalResistance": np.fromfile(self.fid, np.float32, count=1)[0],
"Resistance": np.fromfile(self.fid, np.float32, count=1)[0],
"DesiredBandwidth": np.fromfile(self.fid, np.float32, count=1)[0],
"ActualBandwidth": np.fromfile(self.fid, np.float32, count=1)[0]
}
def _read_header_current_clamp_settings(self):
return {
"HoldingCurrent": np.fromfile(self.fid, np.float32, count=1)[0],
"CurrentStepSize": np.fromfile(self.fid, np.float32, count=1)[0]
}
def _read_header_settings(self):
settings = {
"EnableCapacitiveCompensation": np.fromfile(
self.fid, np.uint8, count=1)[0],
"CapCompensationMagnitude": np.fromfile(
self.fid, np.float32, count=1)[0]
}
filterf = np.fromfile(self.fid, np.float32, count=1)[0]
if filterf > 0:
settings["FilterCutoff"] = filterf
settings["PipetteOffset"] = np.fromfile(
self.fid, np.float32, count=1)[0]
settings["SamplingRate"] = np.fromfile(
self.fid, np.float32, count=1)[0]
settings["CellParameters.Rs"] = np.fromfile(
self.fid, np.float32, count=1)[0]
settings["CellParameters.Rm"] = np.fromfile(
self.fid, np.float32, count=1)[0]
settings["CellParameters.Cm"] = np.fromfile(
self.fid, np.float32, count=1)[0]
settings["IsVoltageClamp"] = np.fromfile(
self.fid, np.uint8, count=1)[0]
settings["vClampX2mode"] = np.fromfile(
self.fid, np.uint8, count=1)[0]
if settings["IsVoltageClamp"]:
settings["VoltageClamp"] = \
self._read_header_voltage_clamp_settings()
else:
settings["CurrentClamp"] = \
self._read_header_current_clamp_settings()
settings["Waveform"] = self._read_waveform()
return settings
def _read_one_header_channel(self):
channel = {
"Registers": np.fromfile(self.fid, np.uint16, count=14),
"DifferenceAmpResidual": np.fromfile(
self.fid, np.int32, count=1)[0],
"VoltageAmpResidual": np.fromfile(self.fid, np.int32, count=1)[0]
}
bestCalibration = np.fromfile(self.fid, np.uint8, count=2*4*2)
bestCalibration = bestCalibration.reshape((2, 4, 2), order='F')
channel["CoarseCalibration"] = bestCalibration[0, :, :].reshape(
(4, 2), order='F')
channel["FineCalibration"] = bestCalibration[1, :, :].reshape(
(4, 2), order='F')
channel["FeedbackResistors"] = np.fromfile(
self.fid, np.float32, count=5)
channel["DesiredBandwidth"] = np.fromfile(
self.fid, np.float32, count=1)[0]
return channel
def _read_one_header_chip(self, numChannels):
return {
"Channels": [
self._read_one_header_channel()
for i in range(numChannels)],
"ChipRegisters": np.fromfile(self.fid, np.uint16, count=4)
}
def _read_header_chips(self):
numChips = np.fromfile(self.fid, np.uint16, count=1)[0]
numChannels = np.fromfile(self.fid, np.uint16, count=1)[0]
return [
self._read_one_header_chip(numChannels) for i in range(numChips)]
def _read_header(self, verbose=0):
magic_number = np.fromfile(self.fid, np.uint32, count=1)[0]
if magic_number != int('0xf3b1a481', 16):
raise RuntimeError('Unrecognized file type.')
self.header = {
"version_major": np.fromfile(self.fid, np.int16, count=1)[0],
"version_minor": np.fromfile(self.fid, np.int16, count=1)[0],
"datatype": np.fromfile(self.fid, np.int16, count=1)[0]
}
version_string = "Data File, Version {0}.{0}\n".format(
self.header["version_major"], self.header["version_minor"])
if verbose > 0:
sys.stdout.write("Reading Intan Technologies CLAMP")
if self.header["datatype"] == 0:
if verbose > 0:
sys.stdout.write(" " + version_string)
NumBytes = np.fromfile(self.fid, np.uint16, count=1)[0]
self.header["date_Year"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Month"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Day"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Hour"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Minute"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Second"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["Chips"] = self._read_header_chips()
self.header["Settings"] = self._read_header_settings()
elif self.header["datatype"] == 1:
if verbose > 0:
sys.stdout.write(" Auxiliary " + version_string)
self.header["NumADCs"] = np.fromfile(
self.fid, np.uint16, count=1)[0]
NumBytes = np.fromfile(self.fid, np.uint16, count=1)[0]
self.header["date_Year"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Month"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Day"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Hour"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Minute"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["date_Second"] = np.fromfile(
self.fid, np.int16, count=1)[0]
self.header["Settings"] = {
"SamplingRate": np.fromfile(self.fid, np.float32, count=1)[0]
}
else:
raise RuntimeError("Unrecognized data type")
pos = self.fid.tell()
if pos != NumBytes:
raise RuntimeError("Header NumBytes doesn't match number of bytes")
def _read_data(self):
start_pos = self.fid.tell()
self.fid.seek(0, 2)
file_size = self.fid.tell()
self.fid.seek(start_pos, 0)
# Timestep + Applied (Software) + Clamp Value + Measured
size_of_one = 4 + 4 + 4 + 4
length = int(np.round((file_size-start_pos) / size_of_one))
# Read in the whole thing at once
byte_array = np.fromfile(self.fid, np.uint8, count=length*size_of_one)
data_matrix = byte_array.reshape((size_of_one, length), order='F')
# Now extract the pieces we need
self.data = {
"Time": data_matrix[0:4, :].reshape(
(4*length), order='F').view(np.uint32).astype(
np.float64) / self.header["Settings"]["SamplingRate"],
"Clamp": data_matrix[4:8, :].reshape(
(4*length), order='F').view(np.float32),
"TotalClamp": data_matrix[8:12, :].reshape(
(4*length), order='F').view(np.float32),
"Measured": data_matrix[12:16, :].reshape(
(4*length), order='F').view(np.float32)
}
def _read_aux_data(self):
start_pos = self.fid.tell()
self.fid.seek(0, 2)
file_size = self.fid.tell()
self.fid.seek(start_pos, 0)
# Timestamps + DigIn + DigOut + ADCs
size_of_one = 4 + 2 + 2 + 2 * self.header["NumADCs"]
length = int(np.round((file_size-start_pos) / size_of_one))
# Read in the whole thing at once
byte_array = np.fromfile(self.fid, np.uint8, count=length*size_of_one)
data_matrix = byte_array.reshape((size_of_one, length), order='F')
digin = data_matrix[4:6, :].reshape(
(2*length), order='F').view(np.uint16)
diginbits = np.unpackbits(digin.view(np.uint8))
diginbits = diginbits.reshape((diginbits.shape[0]/16, 16)).T
diginbits[0:8, :] = diginbits[7::-1, :]
diginbits[8:16, :] = diginbits[15:7:-1, :]
digout = data_matrix[6:8, :].reshape(
(2*length), order='F').view(np.uint16)
digoutbits = np.unpackbits(digout.view(np.uint8))
digoutbits = digoutbits.reshape((digoutbits.shape[0]/16, 16)).T
digoutbits[0:8, :] = digoutbits[7::-1, :]
digoutbits[8:16, :] = digoutbits[15:7:-1, :]
# Now extract the pieces we need
self.data = {
"Time": data_matrix[0:4, :].reshape(
(4*length), order='F').view(np.uint32).astype(
np.float64) / self.header["Settings"]["SamplingRate"],
"DigitalIn": diginbits[0],
"DigitalInAll": diginbits,
"DigitalOut": digoutbits[0],
"DigitalOutAll": digoutbits,
"ADC": np.array([
0.0003125 * data_matrix[2*i+6:2*i+8, :].reshape(
(2*length), order='F').view(
dtype=np.uint16).astype(np.float64) - 2**15
for i in range(self.header["NumADCs"])])
}
@neurodroid
Copy link
Author

2017-06-23 Fix file reading on Windows

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment