Skip to content

Instantly share code, notes, and snippets.

@hypoxic
Created October 15, 2020 01:54
Show Gist options
  • Save hypoxic/2ea7798d442bb6570bb9ecd8b704b48e to your computer and use it in GitHub Desktop.
Save hypoxic/2ea7798d442bb6570bb9ecd8b704b48e to your computer and use it in GitHub Desktop.
Code used to capture a waveform from the Tek TDS3000 series and send it to python
# Hypoxic Capture via TDS 3034B
# Requires pyvisa and NI-VISA, not tested much, but figured I'd share as it would be helpful for others
import pyvisa
import numpy as np
import time as time
class Tek:
def __init__(self, source = "CH1"):
self.timeout = 5.0
#DATASOURCE = "CH1"
#RESOURCE = "TCPIP0::192.168.1.170::inst0::INSTR"
self.acqnum = 0
self.rm = pyvisa.ResourceManager()
self.resource = self.rm.list_resources()[0]
print("Connecting to %s" % (self.resource))
self.dpo = self.rm.open_resource(self.resource)
self.dpo.timeout = 3000
self.dpo.encoding = 'latin_1'
self.dpo.write_termination = None
self.dpo.read_termination = '\n'
print(self.dpo.query('*IDN?'))
# setup the transfer
self.dpo.write('data:source %s' % (source))
self.dpo.write('header 0')
self.dpo.write('data:encdg rib') #signed integer, msb first
self.dpo.write('data:width 2')
self.dpo.write('data:resolution full')
# Getting scope's time scale
#print(dpo.query("WFMPre?"))
self.x_incr = float(self.dpo.query('WFMPre:XINcr?'))
self.x_zero = float(self.dpo.query("WFMPre:XZEro?"))
self.y_incr = float(self.dpo.query("WFMPre:YMUlt?"))
self.y_off = float(self.dpo.query("WFMPre:YOFf?"))
self.y_zero = float(self.dpo.query("WFMPre:YZEro?"))
self.record_length = int(self.dpo.query("WFMPre:NR_Pt?"))
print("x_zero %f" % self.x_zero)
print("y_incr %f" % self.y_incr)
print("y_off %f" % self.y_off)
print("y_zero %f" % self.y_zero)
print("record_length %d" % self.record_length)
def get_last_trace(self):
Y = []
self.dpo.write('CURV?');
buffer = self.dpo.read_raw()
buffer = buffer[:-1]
res = np.frombuffer(buffer, dtype = np.dtype('int16').newbyteorder('>'), offset=int(buffer[1])+2)
# not sure why the record length is less than 10,000, but it is
self.record_length = len(res)
# For speed, no need to convert it to a float. lets do all int math!
Y = (res - self.y_off)*self.y_incr # It's already doing processing
Y = res
self.acqnum += 1
return Y
def setHscale(self):
# can only be 1,2 or 4
print(self.dpo.query("HORizontal:MAIn:SCAle?"))
#print(self.dpo.write("HORizontal:MAIn:SCAle 4.0E-7"))
#print(self.dpo.query("HORizontal:MAIn:SCAle?"))
def getXAxis(self):
X = self.x_zero + np.arange(0, self.record_length)*self.x_incr
return X
def arm(self):
self.dpo.write('ACQUIRE:STOPAfter SEQuence')
self.dpo.write('ACQuire:STATE RUN')
tout = time.time() + self.timeout
while( "READY" not in self.dpo.query('TRIGger:STATE?') ):
#print(self.dpo.query('TRIGger:STATE?'))
if tout < time.time():
print("armed failed, trigger state %s" % (self.dpo.query('TRIGger:STATE?')))
break
def capture(self):
triggered = False
tout = time.time() + self.timeout
while( not (self.isTriggered() )):
if tout < time.time():
break
if (self.isTriggered()):
triggered = True
return not triggered
def isTriggered(self):
a = self.dpo.query('TRIGger:STATE?')
return ("SAV" in a)
def close(self):
self.dpo.close()
def testscope():
tek = Tek(source = "CH1")
tek.arm()
tek.capture()
tek.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment