Last active
August 14, 2023 01:22
-
-
Save hansemro/311107a5bf4a5e5733b729a947f09b0b to your computer and use it in GitHub Desktop.
[WIP][SDS2000X Plus] Python script to read all waveforms
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-License-Identifier: MIT | |
# Copyright (c) 2023 Hansem Ro | |
# Read all waveforms from Siglent SDS2000X Plus | |
# - Segmented captures will be combined to form a larger capture. | |
# - 8/10-bit ADC supported with any :WAV:WIDTH | |
# Usage: | |
# 0. pip install socketscpi matplotlib | |
# 1. Change ipAddress | |
# 2. Capture data on scope with single trigger. Feel free to enable segmented capture or use any | |
# number of channels. | |
# 3. Run this script and view plots of each channel. | |
# Tasks: | |
# - [x] Read all analog waveforms | |
# - [x] Support non-segmented capture | |
# - [x] Read individual frame data in chunks if necessary | |
# - necessary if number of samples in a frame > :WAV:MAXPOINT? | |
# - [x] Support segmented capture | |
# - [ ] Reliably handle segment frame captures in segment chunks if supported | |
# - necessary if number of segments is greater than 50. | |
# - Read in data chunks if necessary | |
# - [x] Support segmented capture via :HISTORY | |
# - Supports FW older than 1.5.2R1 | |
# - [x] Support 8/10-bit ADC modes | |
# - [x] Support :ACQ:RES 8Bits + :WAV:WIDTH BYTE | |
# - [x] Support :ACQ:RES 10Bits + :WAV:WIDTH WORD | |
# - [x] Support :ACQ:RES 8Bits + :WAV:WIDTH WORD | |
# - [x] Support :ACQ:RES 10Bits + :WAV:WIDTH BYTE | |
# - [ ] Replace matplotlib to support 200Mpts | |
# - [x] Read all digital waveforms | |
# - [x] Read all math function waveforms | |
# - [x] Support FW >= 1.5.2R1 | |
# - [x] Support FW < 1.5.2R1 | |
import socketscpi | |
import matplotlib.pyplot as pl | |
import time as t | |
import struct | |
import math | |
from datetime import datetime | |
ipAddress = 'CHANGE_ME' | |
def query_b(sds, cmd): | |
return bytearray(sds.query(cmd), encoding='latin_1') | |
def read_b(sds): | |
return bytearray(sds.read(), encoding='latin_1') | |
class SiglentWaveDesc: | |
""" | |
Siglent WaveDesc + Data Handler | |
""" | |
def __init__(self, ch:str): | |
self.ch = ch | |
self.has_timestamp = False | |
self.timestamps = [] | |
# comm_type: 0-BYTE, 1-WORD | |
self.comm_type = 0 | |
# comm_order: 0-LSB, 1-MSB | |
self.comm_order = 0 | |
# WAVEDESC length in bytes | |
self.length = 0 | |
# number of bytes in a waveform frame | |
self.data_bytes = 0 | |
# number of samples in a waveform frame | |
self.unit_frame_points = 0 | |
# sample point offset (same as point index in :WAVEFORM:START) | |
self.first_point = 0 | |
# number of points between data samples | |
self.data_interval = 0 | |
# number of frames in the data fetch | |
# - ignore if 0 or larger than total_frames | |
self.read_frames = 0 | |
# total number of segment frames | |
self.total_frames = 0 | |
# vertical gain | |
self.v_gain = 0.0 | |
# vertical offset | |
self.v_offset = 0.0 | |
# (New in 1.5.2R1) code_per_div for given ADC resolution | |
self.code_per_div = 0.0 | |
# ADC resolution in bits | |
self.adc_bit = 8 | |
# frame index (first index at 1) | |
self.frame_index = 1 | |
# sampling interval in s/Sa | |
self.h_interval = 0.0 | |
# trigger delay offset | |
self.h_offset = 0.0 | |
self.tdiv_index = 0 | |
# vertical coupling: 0-DC, 1-AC, 2-GND | |
self.v_coupling = 0.0 | |
# channel probe attenuation | |
self.probe_factor = 0.0 | |
self.fixed_v_gain = 0 | |
# bandwidth limit: 0-OFF, 1-20M, 2-200M | |
self.bw_limit = 0 | |
# wave source index (starting from 0) | |
self.source = 0 | |
def parse_desc(self, desc:bytearray): | |
assert desc[0:8] == b'WAVEDESC' | |
assert desc[16:23] == b'WAVEACE' | |
self.comm_type, self.comm_order, self.length = struct.unpack('<hhl', desc[32:40]) | |
print(f"comm_type: {'byte' if self.comm_type == 0 else 'word'}") | |
print(f"comm_order: {'LSB' if self.comm_order == 0 else 'MSB'}") | |
print(f"wavdesc length: {self.length}") | |
assert self.comm_order == 0 | |
# number of bytes in the first data array. | |
self.data_bytes = struct.unpack('<l', desc[60:64])[0] | |
# number of points in data array | |
self.unit_frame_points = struct.unpack('<l', desc[116:120])[0] | |
print(f"data_bytes: {self.data_bytes}") | |
print(f"unit_frame_points: {self.unit_frame_points}") | |
self.first_point, self.data_interval = struct.unpack('<ll', desc[132:140]) | |
print(f"first_point: {self.first_point}") | |
print(f"data_interval: {self.data_interval}") | |
self.read_frames, self.total_frames = struct.unpack('<ll', desc[144:152]) | |
print(f"read_frames: {self.read_frames}") | |
print(f"total_frames: {self.total_frames}") | |
self.v_gain, self.v_offset, self.code_per_div = struct.unpack('<fff', desc[156:168]) | |
print(f"v_gain: {self.v_gain}") | |
print(f"v_offset: {self.v_offset}") | |
print(f"code_per_div: {self.code_per_div}") | |
self.adc_bit, self.frame_index, self.h_interval, self.h_offset = struct.unpack('<hhfd', desc[172:188]) | |
srate = 1.0/self.h_interval | |
print(f"adc_bit: {self.adc_bit}") | |
print(f"frame_index: {self.frame_index}") | |
print(f"h_interval: {self.h_interval}") | |
print(f"srate: {srate}") | |
print(f"h_offset: {self.h_offset}") | |
self.tdiv_index, self.v_coupling, self.probe_factor, self.fixed_v_gain, self.bw_limit = struct.unpack('<hhfhh', desc[324:336]) | |
print(f"tdiv_index: {self.tdiv_index}") | |
print(f"v_coupling: {self.v_coupling}") | |
print(f"probe_factor: {self.probe_factor}") | |
print(f"fixed_v_gain: {self.fixed_v_gain}") | |
print(f"bw_limit: {self.bw_limit}") | |
self.source = struct.unpack('<h', desc[344:346])[0] | |
print(f"source: {self.source}") | |
self.has_timestamp = False | |
if len(desc) > 346: | |
self.has_timestamp = True | |
print("timestamps:") | |
for i in range(346, len(desc), 16): | |
time_stamp = desc[i:i+14] | |
second, minute, hour, day, month, year = struct.unpack('<dcccch', time_stamp) | |
month = int.from_bytes(month, byteorder='little', signed=False) | |
day = int.from_bytes(day, byteorder='little', signed=False) | |
hour = int.from_bytes(hour, byteorder='little', signed=False) | |
minute = int.from_bytes(minute, byteorder='little', signed=False) | |
timestamp = datetime.strptime(f"{year}-{month}-{day} {hour}:{minute}:{second:.6f}", "%Y-%m-%d %H:%M:%S.%f") | |
if timestamp not in self.timestamps: | |
print(f"\tadding {timestamp}") | |
self.timestamps += [timestamp] | |
else: | |
print("\ttimestamp exists, skipping") | |
def process_analog_wave(self, data:bytearray): | |
time = [] | |
voltages = [] | |
num_samples = self.unit_frame_points * self.total_frames | |
print(f"num_samples: {num_samples}") | |
num_samples_per_frame = self.unit_frame_points | |
print(f"num_samples_per_frame: {num_samples_per_frame}") | |
num_bytes_per_sample = self.comm_type + 1 | |
num_bytes_per_frame = num_samples_per_frame * num_bytes_per_sample | |
#assert len(data) == num_bytes_per_frame * self.total_frames | |
# for correct voltage scaling (on SDS2000X+), ensure: | |
# - code_per_div == 30 when :WAV:WIDTH BYTE, | |
# - and code_per_div == 7680 when :WAV:WIDTH WORD | |
# | |
# When adc_bit == 8, self.code_per_div starts at 30. | |
# When adc_bit == 10, self.code_per_div starts at 7680. | |
# Adjust final code_per_div depending on :WAV:WIDTH. | |
# Below works in 1.5.2R1 and newer: | |
#code_per_div = self.code_per_div | |
#if self.adc_bit == 8 and num_bytes_per_sample == 2: | |
# code_per_div *= 256 | |
#elif self.adc_bit == 16 and num_bytes_per_sample == 1: | |
# code_per_div /= 256 | |
# Below works for all versions: | |
code_per_div = 30 | |
if num_bytes_per_sample == 2: | |
code_per_div *= 256 | |
print(f"final code_per_div: {code_per_div}") | |
v_gain = self.v_gain * self.probe_factor / code_per_div | |
print(f"final v_gain: {v_gain}") | |
v_offset = self.v_offset * self.probe_factor | |
h_offset = 0 | |
print(f"num_timestamps: {len(self.timestamps)}") | |
for seg in range(0, self.total_frames): | |
seg_data = data[seg*num_bytes_per_frame:(seg+1)*num_bytes_per_frame] | |
if seg > 0: | |
delta = (self.timestamps[seg] - self.timestamps[seg-1]).total_seconds() | |
assert delta > 0, f"({seg}) {self.timestamps[seg]} - {self.timestamps[seg - 1]} should be greater than 0" | |
h_offset += delta | |
for i in range(0, num_samples_per_frame): | |
value = int.from_bytes(seg_data[i*num_bytes_per_sample:(i+1)*num_bytes_per_sample], byteorder='little', signed=True) | |
volt = float(value) * v_gain - v_offset | |
t = i * self.h_interval + h_offset | |
voltages += [volt] | |
time += [t] | |
return time, voltages | |
def process_digital_wave(self, data:bytearray, num_samples:int): | |
time = [] | |
values = [] | |
print(f"num_digital_samples: {num_samples}") | |
#assert (num_samples // 8) == len(data) | |
# samples are each 1-bit and there are 8 samples per byte | |
for i in range(0, num_samples // 8): | |
byte_samples = data[i] | |
for j in range(0, 8): | |
# get value of nth sample in byte | |
value = byte_samples & 1 | |
byte_samples = byte_samples >> 1 | |
t = i * self.h_interval | |
values += [value] | |
time += [t] | |
return time, values | |
def get_enabled_channels(sds): | |
enabled_channels = [] | |
# analog channels | |
for i in range(0,4): | |
ch = f"C{i+1}" | |
reply = sds.query(f":CHANNEL{i+1}:SWITCH?").strip() | |
if reply == "ON": | |
print(f"{ch} is ON") | |
enabled_channels += [ch] | |
else: | |
print(f"{ch} is OFF") | |
# math function channels | |
for i in range(0,4): | |
ch = f"F{i+1}" | |
reply = sds.query(f":FUNCTION{i+1}?").strip() | |
if reply == "ON": | |
print(f"{ch} is ON") | |
enabled_channels += [ch] | |
else: | |
print(f"{ch} is OFF") | |
# digital channels | |
digital_en = sds.query(":DIGITAL?").strip() == "ON" | |
if digital_en: | |
for i in range(0,16): | |
ch = f"D{i}" | |
reply = sds.query(f":DIGITAL:{ch}?").strip() | |
if reply == "ON": | |
print(f"{ch} is ON") | |
enabled_channels += [ch] | |
else: | |
print(f"{ch} is OFF") | |
return enabled_channels | |
def fetch_frame_data(sds, wd:SiglentWaveDesc): | |
num_samples = 0 | |
data = bytearray() | |
num_bytes_per_sample = wd.comm_type + 1 | |
# read data in one or more parts | |
while num_samples < wd.unit_frame_points: | |
# set sample point index to start reads from | |
sds.write(f":WAVEFORM:START {num_samples}") | |
# fetch data | |
partial_data_all = query_b(sds, ":WAVEFORM:DATA?").rstrip() | |
#print(f"partial_data_all length: {len(partial_data_all)}") | |
header_start = partial_data_all.find(b'#') | |
# get number of samples in the data fetch | |
partial_num_samples = int(partial_data_all[header_start+3:header_start+11].decode('utf-8')) // num_bytes_per_sample | |
print(f"num samples in data fetch: {partial_num_samples}") | |
# remove header | |
partial_data = partial_data_all[header_start+11:] | |
#print(f"partial_data length: {len(partial_data)}") | |
num_samples += partial_num_samples | |
data += partial_data | |
del partial_data_all | |
del partial_data | |
return num_samples, data | |
def fetch_waves(sds, plot=False): | |
enabled_channels = get_enabled_channels(sds) | |
for ch in enabled_channels: | |
print(f"Waveform {ch}:") | |
sds.write(f":WAVEFORM:SOURCE {ch}") | |
# Start data fetch with the first sample point | |
sds.write(":WAVEFORM:START 0") | |
# Fetch maximum number of sample points (constrained by :WAVEFORM:MAXPOINT?) | |
sds.write(":WAVEFORM:POINT 0") | |
# Fetch single segment frame (starting from the first frame) | |
sds.write(":WAVEFORM:SEQUENCE 1,1") | |
max_samples_per_data_fetch = sds.query(":WAVEFORM:MAXPOINT?") | |
# Fetch and parse parameters | |
wd = SiglentWaveDesc(ch) | |
if "D" in ch: | |
wd.unit_frame_points = int(float(sds.query(":DIGITAL:POINTS?"))) | |
wd.data_bytes = wd.unit_frame_points | |
s_rate = float(sds.query(":DIGITAL:SRATE?")) | |
wd.h_interval = 1.0/s_rate | |
segment_cap_en = False | |
else: | |
# Fetch WAVEDESC | |
recv_all = query_b(sds, ":WAVEFORM:PREAMBLE?") | |
#recv_all = query_b(sds, f"{ch}:WF? DESC") | |
wavedesc = recv_all[recv_all.find(b'#') + 11:] | |
wd.parse_desc(wavedesc) | |
segment_cap_en = sds.query(":ACQUIRE:SEQUENCE?") == "ON" | |
if segment_cap_en: | |
# get total number of sequence frames | |
wd.total_frames = int(sds.query(":ACQUIRE:SEQUENCE:COUNT?")) | |
# clear timestamps | |
wd.timestamps = [] | |
sds.write(":HISTORY ON") | |
# gather all data (including all segmented frame captures) | |
# WARNING: this may consume lots of RAM for larger memory depths | |
if not segment_cap_en: | |
# get entire frame data | |
num_samples, data = fetch_frame_data(sds, wd) | |
else: | |
num_samples = 0 | |
data = bytearray() | |
max_hour = 0 | |
# for each segment frame | |
for seg in range(1, wd.total_frames+1): | |
sds.write(f":HISTORY:FRAME {seg}") | |
# Timestamp obtained via :HISTORY:TIME? lacks the year, month, and day. | |
year = 2000 | |
month = 1 | |
day = 1 | |
# timestamp format: hour: minute: second. microsecond | |
time = sds.query(":HISTORY:TIME?") | |
hour,minute,second = time.split(":", 2) | |
second,microsecond = second.split('.') | |
hour = int(hour) | |
minute = int(minute) | |
second = int(second) | |
microsecond = int(microsecond) | |
# next day test | |
if hour > max_hour: | |
max_hour = hour | |
elif hour < max_hour: | |
# new day when hour rolls back to 0 | |
day = 2 | |
timestamp = datetime.strptime(f"{year}-{month}-{day} {hour}:{minute}:{second}.{microsecond:06}", "%Y-%m-%d %H:%M:%S.%f") | |
print(timestamp) | |
if timestamp not in wd.timestamps: | |
wd.timestamps += [timestamp] | |
# get entire frame data | |
seg_num_samples, seg_data = fetch_frame_data(sds, wd) | |
num_samples += seg_num_samples | |
data += seg_data | |
del seg_data | |
sds.write(":HISTORY OFF") | |
#print(f"total data length: {len(data)}") | |
# process data | |
if "C" in ch: | |
# analog channels | |
time,values = wd.process_analog_wave(data) | |
elif "F" in ch: | |
# math function channels | |
time,values = wd.process_analog_wave(data) | |
else: | |
# digital channels | |
time,values = wd.process_digital_wave(data, num_samples) | |
if plot: | |
pl.figure() | |
pl.plot(time, values) | |
pl.grid() | |
pl.show() | |
pl.close() | |
del time | |
del values | |
# newline | |
print() | |
if __name__ == '__main__': | |
sds = socketscpi.SocketInstrument(ipAddress) | |
fetch_waves(sds, plot=True) | |
sds.close() |
Author
hansemro
commented
Aug 9, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment