Code used to capture a waveform from the Tek TDS3000 series and send it to python
# Hypoxic Capture via TDS 3034B | |
# Requires pyvisa and NI-VISA, not tested much, but figured I'd share as it would be helpful for others | |
import pyvisa | |
import numpy as np | |
import time as time | |
class Tek: | |
def __init__(self, source = "CH1"): | |
self.timeout = 5.0 | |
#DATASOURCE = "CH1" | |
#RESOURCE = "TCPIP0::192.168.1.170::inst0::INSTR" | |
self.acqnum = 0 | |
self.rm = pyvisa.ResourceManager() | |
self.resource = self.rm.list_resources()[0] | |
print("Connecting to %s" % (self.resource)) | |
self.dpo = self.rm.open_resource(self.resource) | |
self.dpo.timeout = 3000 | |
self.dpo.encoding = 'latin_1' | |
self.dpo.write_termination = None | |
self.dpo.read_termination = '\n' | |
print(self.dpo.query('*IDN?')) | |
# setup the transfer | |
self.dpo.write('data:source %s' % (source)) | |
self.dpo.write('header 0') | |
self.dpo.write('data:encdg rib') #signed integer, msb first | |
self.dpo.write('data:width 2') | |
self.dpo.write('data:resolution full') | |
# Getting scope's time scale | |
#print(dpo.query("WFMPre?")) | |
self.x_incr = float(self.dpo.query('WFMPre:XINcr?')) | |
self.x_zero = float(self.dpo.query("WFMPre:XZEro?")) | |
self.y_incr = float(self.dpo.query("WFMPre:YMUlt?")) | |
self.y_off = float(self.dpo.query("WFMPre:YOFf?")) | |
self.y_zero = float(self.dpo.query("WFMPre:YZEro?")) | |
self.record_length = int(self.dpo.query("WFMPre:NR_Pt?")) | |
print("x_zero %f" % self.x_zero) | |
print("y_incr %f" % self.y_incr) | |
print("y_off %f" % self.y_off) | |
print("y_zero %f" % self.y_zero) | |
print("record_length %d" % self.record_length) | |
def get_last_trace(self): | |
Y = [] | |
self.dpo.write('CURV?'); | |
buffer = self.dpo.read_raw() | |
buffer = buffer[:-1] | |
res = np.frombuffer(buffer, dtype = np.dtype('int16').newbyteorder('>'), offset=int(buffer[1])+2) | |
# not sure why the record length is less than 10,000, but it is | |
self.record_length = len(res) | |
# For speed, no need to convert it to a float. lets do all int math! | |
Y = (res - self.y_off)*self.y_incr # It's already doing processing | |
Y = res | |
self.acqnum += 1 | |
return Y | |
def setHscale(self): | |
# can only be 1,2 or 4 | |
print(self.dpo.query("HORizontal:MAIn:SCAle?")) | |
#print(self.dpo.write("HORizontal:MAIn:SCAle 4.0E-7")) | |
#print(self.dpo.query("HORizontal:MAIn:SCAle?")) | |
def getXAxis(self): | |
X = self.x_zero + np.arange(0, self.record_length)*self.x_incr | |
return X | |
def arm(self): | |
self.dpo.write('ACQUIRE:STOPAfter SEQuence') | |
self.dpo.write('ACQuire:STATE RUN') | |
tout = time.time() + self.timeout | |
while( "READY" not in self.dpo.query('TRIGger:STATE?') ): | |
#print(self.dpo.query('TRIGger:STATE?')) | |
if tout < time.time(): | |
print("armed failed, trigger state %s" % (self.dpo.query('TRIGger:STATE?'))) | |
break | |
def capture(self): | |
triggered = False | |
tout = time.time() + self.timeout | |
while( not (self.isTriggered() )): | |
if tout < time.time(): | |
break | |
if (self.isTriggered()): | |
triggered = True | |
return not triggered | |
def isTriggered(self): | |
a = self.dpo.query('TRIGger:STATE?') | |
return ("SAV" in a) | |
def close(self): | |
self.dpo.close() | |
def testscope(): | |
tek = Tek(source = "CH1") | |
tek.arm() | |
tek.capture() | |
tek.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment