Created
October 15, 2020 01:54
-
-
Save hypoxic/2ea7798d442bb6570bb9ecd8b704b48e to your computer and use it in GitHub Desktop.
Code used to capture a waveform from the Tek TDS3000 series and send it to python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Hypoxic Capture via TDS 3034B | |
# Requires pyvisa and NI-VISA, not tested much, but figured I'd share as it would be helpful for others | |
import pyvisa | |
import numpy as np | |
import time as time | |
class Tek: | |
def __init__(self, source = "CH1"): | |
self.timeout = 5.0 | |
#DATASOURCE = "CH1" | |
#RESOURCE = "TCPIP0::192.168.1.170::inst0::INSTR" | |
self.acqnum = 0 | |
self.rm = pyvisa.ResourceManager() | |
self.resource = self.rm.list_resources()[0] | |
print("Connecting to %s" % (self.resource)) | |
self.dpo = self.rm.open_resource(self.resource) | |
self.dpo.timeout = 3000 | |
self.dpo.encoding = 'latin_1' | |
self.dpo.write_termination = None | |
self.dpo.read_termination = '\n' | |
print(self.dpo.query('*IDN?')) | |
# setup the transfer | |
self.dpo.write('data:source %s' % (source)) | |
self.dpo.write('header 0') | |
self.dpo.write('data:encdg rib') #signed integer, msb first | |
self.dpo.write('data:width 2') | |
self.dpo.write('data:resolution full') | |
# Getting scope's time scale | |
#print(dpo.query("WFMPre?")) | |
self.x_incr = float(self.dpo.query('WFMPre:XINcr?')) | |
self.x_zero = float(self.dpo.query("WFMPre:XZEro?")) | |
self.y_incr = float(self.dpo.query("WFMPre:YMUlt?")) | |
self.y_off = float(self.dpo.query("WFMPre:YOFf?")) | |
self.y_zero = float(self.dpo.query("WFMPre:YZEro?")) | |
self.record_length = int(self.dpo.query("WFMPre:NR_Pt?")) | |
print("x_zero %f" % self.x_zero) | |
print("y_incr %f" % self.y_incr) | |
print("y_off %f" % self.y_off) | |
print("y_zero %f" % self.y_zero) | |
print("record_length %d" % self.record_length) | |
def get_last_trace(self): | |
Y = [] | |
self.dpo.write('CURV?'); | |
buffer = self.dpo.read_raw() | |
buffer = buffer[:-1] | |
res = np.frombuffer(buffer, dtype = np.dtype('int16').newbyteorder('>'), offset=int(buffer[1])+2) | |
# not sure why the record length is less than 10,000, but it is | |
self.record_length = len(res) | |
# For speed, no need to convert it to a float. lets do all int math! | |
Y = (res - self.y_off)*self.y_incr # It's already doing processing | |
Y = res | |
self.acqnum += 1 | |
return Y | |
def setHscale(self): | |
# can only be 1,2 or 4 | |
print(self.dpo.query("HORizontal:MAIn:SCAle?")) | |
#print(self.dpo.write("HORizontal:MAIn:SCAle 4.0E-7")) | |
#print(self.dpo.query("HORizontal:MAIn:SCAle?")) | |
def getXAxis(self): | |
X = self.x_zero + np.arange(0, self.record_length)*self.x_incr | |
return X | |
def arm(self): | |
self.dpo.write('ACQUIRE:STOPAfter SEQuence') | |
self.dpo.write('ACQuire:STATE RUN') | |
tout = time.time() + self.timeout | |
while( "READY" not in self.dpo.query('TRIGger:STATE?') ): | |
#print(self.dpo.query('TRIGger:STATE?')) | |
if tout < time.time(): | |
print("armed failed, trigger state %s" % (self.dpo.query('TRIGger:STATE?'))) | |
break | |
def capture(self): | |
triggered = False | |
tout = time.time() + self.timeout | |
while( not (self.isTriggered() )): | |
if tout < time.time(): | |
break | |
if (self.isTriggered()): | |
triggered = True | |
return not triggered | |
def isTriggered(self): | |
a = self.dpo.query('TRIGger:STATE?') | |
return ("SAV" in a) | |
def close(self): | |
self.dpo.close() | |
def testscope(): | |
tek = Tek(source = "CH1") | |
tek.arm() | |
tek.capture() | |
tek.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment