|
#!/usr/bin/python |
|
# -*- coding: utf-8 -*- |
|
|
|
import numpy as np |
|
import time |
|
|
|
from PyRedPitaya.board import RedPitaya |
|
from PyRedPitaya.instrument import TriggerSource |
|
|
|
class AWG(object): |
|
def __init__(self): |
|
self.awgA = RedPitaya().asga #TODO: initialize |
|
self.awgB = RedPitaya().asgb |
|
self.setup() |
|
|
|
def setup(self, frequency=0, amplitude=0): |
|
for awg in (self.awgA, self.awgB): |
|
#awg = self.awg |
|
|
|
awg.output_zero = True |
|
awg.sm_reset = True |
|
awg.trig_selector = 0 |
|
awg.scale = int(2**13*amplitude) #TODO: use amplitude |
|
awg.offset = 0 |
|
|
|
#waveform = np.zeros(2**14, dtype=np.int32) |
|
t = np.arange(2**14, dtype=np.float32) |
|
d = (2**13-1)*np.cos(t*(2*np.pi/2**14)) #cos for easy triggering |
|
awg.data = np.round(d).astype(np.int32) |
|
|
|
awg.start_offset = 0 |
|
awg.counter_wrap = 2**16*(2**14-1) |
|
awg.frequency = frequency |
|
|
|
#single signal output |
|
#self.sm_onetimetrigger = True |
|
#self.sm_wrappointer = False |
|
|
|
awg.sm_onetimetrigger = False |
|
awg.sm_wrappointer = True |
|
|
|
awg.output_zero = False |
|
awg.sm_reset = False |
|
|
|
def set_singleshot(self, singleshot=True): |
|
awg = self.awg |
|
|
|
#awg.sm_reset = True |
|
if singleshot: |
|
awg.sm_wrappointer = False |
|
awg.sm_onetimetrigger = True |
|
else: |
|
awg.sm_onetimetrigger = False |
|
awg.sm_wrappointer = True |
|
#awg.sm_reset = False |
|
|
|
def _trig(self, awg, frequency=None): |
|
if not frequency is None: |
|
awg.frequency = frequency |
|
awg.start_offset = 0 |
|
awg.trig_selector = 1 |
|
awg.trig_selector = 0 |
|
|
|
def trig_A(self, frequency=None): |
|
self._trig(self.awgA, frequency) |
|
|
|
def trig_B(self, frequency=None): |
|
self._trig(self.awgB, frequency) |
|
|
|
def _set_frequency(self, awg, f): |
|
awg.frequency = f |
|
return awg.frequency |
|
|
|
def set_frequency_A(self, f): |
|
return self._set_frequency(self.awgA, f) |
|
|
|
def set_frequency_B(self, f): |
|
return self._set_frequency(self.awgB, f) |
|
|
|
def _set_amplitude(self, awg, amplitude): |
|
awg.scale = int(2**13*amplitude) |
|
|
|
def set_amplitude_A(self, amplitude): |
|
self._set_amplitude(self.awgA, amplitude) |
|
|
|
def set_amplitude_B(self, amplitude): |
|
self._set_amplitude(self.awgB, amplitude) |
|
|
|
|
|
|
|
class Scope(object): |
|
TriggerSource = TriggerSource |
|
|
|
def __init__(self, |
|
decimation = 1, |
|
record_length = 1000, |
|
channel = 'A', |
|
): |
|
self.scope = RedPitaya().scope |
|
self.scope.data_decimation = decimation |
|
self.record_length = record_length |
|
self._record_length=record_length |
|
self.scope.threshold_ch1 = 0 |
|
self.scope.threshold_ch2 = 0 |
|
|
|
#record_length: samples after trigger |
|
@property |
|
def record_length(self): |
|
return self._record_length |
|
|
|
@record_length.setter |
|
def record_length(self, value): |
|
if value<0 or value>2**14: |
|
raise ValueError('record length must be in range 0 - 16384)') |
|
self._record_length = value |
|
self.scope.trigger_delay = self._record_length |
|
|
|
def get_data_after_trigger(self, channel='A'): |
|
if channel=='A': |
|
data_base_addr = 0x10000 |
|
elif channel=='B': |
|
data_base_addr = 0x20000 |
|
else: |
|
raise ValueError('unknown channel') |
|
|
|
N = self.record_length |
|
trigger_pos = self.scope.write_pointer_trigger |
|
addr = data_base_addr + 4*trigger_pos |
|
|
|
if (trigger_pos+N) <= (2**14): |
|
data = self.scope.reads(addr, N) |
|
data = data.copy() #data is read-only, why? |
|
else: |
|
N1 = 2**14 - trigger_pos |
|
N2 = N-N1 |
|
data1 = self.scope.reads(addr, N1) |
|
data2 = self.scope.reads(data_base_addr, N2) |
|
data = np.concatenate((data1, data2)) |
|
data.dtype = np.int32 |
|
data[data>2**13] -= 2**14 |
|
return data |
|
|
|
def trigger_now(self): |
|
self.scope.arm_trigger() #starts acquisition |
|
self.scope.trigger_source = TriggerSource.immediately #trigger |
|
#self.wait_for_acquisition_finished() |
|
|
|
def start(self): |
|
self.scope.arm_trigger() |
|
self.scope.trigger_source = TriggerSource.chA_posedge |
|
|
|
def wait_for_acquisition_finished(self): |
|
#after trigger: poll until trigger_source resets to zero |
|
k = 0 |
|
while self.scope.trigger_source != 0 and k < 100: |
|
time.sleep(0.001) |
|
k += 1 |
|
|
|
#wait until write pointer does not change anymore |
|
time.sleep(0.0005) #TODO: remove me |
|
|
|
|
|
class TransducerService(object): |
|
scale_amplitude = 1./8 # |
|
|
|
scale_chA = 25./2**13 #scaling scope, HV |
|
scale_chB = 1./2**13 |
|
|
|
#offsets input, used for compensation |
|
offset_chA = 0.020 #-0.038 |
|
offset_chB = 0.019 |
|
|
|
N_points = 500 |
|
|
|
R_shunt = 10. |
|
|
|
def __init__(self): |
|
self.awg = AWG() |
|
self.awg.trig_A() #start output |
|
self.awg.trig_B() |
|
|
|
self.scope = Scope() |
|
|
|
self._frequency_A = 0 |
|
self._frequency_B = 0 |
|
|
|
self.set_frequency_A(1e6) |
|
self.set_amplitude_A(0.1) |
|
|
|
self.set_frequency_B(1.31415e6) |
|
self.set_amplitude_B(0.1) |
|
|
|
def set_frequency_A(self, f): |
|
#0.5 ms without update |
|
old_value = self._frequency_A |
|
new_value = self.awg.set_frequency_A(f) |
|
#TODO: set decimation/average factor: N_points*sample_time < period: |
|
self._frequency_A = new_value |
|
if old_value != new_value: |
|
self.invalidate_ref() |
|
return self._frequency_A |
|
|
|
def set_frequency_B(self, f): |
|
#0.5 ms without update |
|
old_value = self._frequency_B |
|
new_value = self.awg.set_frequency_B(f) |
|
#TODO: set decimation/average factor: N_points*sample_time < period: |
|
self._frequency_B = new_value |
|
return self._frequency_B |
|
|
|
def update_scope(self): |
|
#set record length for complete cycles, about 1000 samples |
|
T = 1./self._frequency_A |
|
n_period = T/(8e-9*self.scope.scope.data_decimation) #points per period |
|
if n_period > 2000: |
|
raise ValueError #frequency too low -> decimate? |
|
n_cycles = np.ceil(self.N_points/n_period) #number of cycles in self.N_points |
|
n_points = int(round(n_cycles * n_period)) #adjust record length for integer number of complete cycle |
|
self.scope.record_length = n_points |
|
|
|
def invalidate_ref(self): |
|
self.ref1 = None |
|
self.ref2 = None |
|
|
|
def update_ref(self): |
|
#calculate reference (sine/cosine) signal |
|
t = 8e-9*self.scope.scope.data_decimation*np.arange(self.scope.record_length, |
|
dtype=np.float32 |
|
) |
|
ph = 2*np.pi*self._frequency_A*t |
|
self.ref1 = np.sin(ph) #slow |
|
self.ref2 = np.cos(ph) |
|
|
|
def set_amplitude_A(self, amplitude): |
|
"set desired amplitude in Vpp" |
|
self.awg.set_amplitude_A(self.scale_amplitude*amplitude) |
|
|
|
def set_amplitude_B(self, amplitude): |
|
"set desired amplitude in Vpp" |
|
self.awg.set_amplitude_B(self.scale_amplitude*amplitude) |
|
|
|
def measure(self): |
|
self.update_scope() |
|
self.scope.start() |
|
#self.scope.trigger_now() |
|
self.scope.wait_for_acquisition_finished() |
|
self.data_chA = self.scope.get_data_after_trigger(channel='A').astype(np.float32)*self.scale_chA - self.offset_chA |
|
self.data_chB = self.scope.get_data_after_trigger(channel='B').astype(np.float32)*self.scale_chB - self.offset_chB |
|
|
|
def analyze_data(self): |
|
#1 ms, 500pts |
|
if self.ref1 is None: |
|
self.update_ref() |
|
|
|
data_A = self.data_chA |
|
|
|
X1 = np.mean(self.ref1*self.data_chA) |
|
Y1 = np.mean(self.ref2*self.data_chA) |
|
|
|
X2 = np.mean(self.ref1*self.data_chB) |
|
Y2 = np.mean(self.ref2*self.data_chB) |
|
|
|
U1 = np.complex64(complex(X1,Y1)) |
|
U2 = np.complex64(complex(X2,Y2)) |
|
|
|
A1 = abs(U1) |
|
A2 = abs(U2) |
|
phi1 = np.angle(U1) |
|
phi2 = np.angle(U2) |
|
|
|
I = U2/self.R_shunt #current shunt = current piezo |
|
U = U1-U2 #current across piezo |
|
Z = U/I #impedance piezo |
|
|
|
return 4*A1, phi1, 4*A2, phi2, 4*abs(U), Z |
|
|
|
def sweep(self, N=201, f_min=1e6, f_max=30e6, linear=False, amplitude = None): |
|
"""perform sweep, |
|
return (complex) amplitude amplifier & shunt, voltage DUT, complex impedance |
|
""" |
|
if amplitude is not None: |
|
self.set_amplitude(amplitude) |
|
|
|
if linear: |
|
ff = np.linspace(f_min, f_max, N) |
|
else: |
|
ff = np.logspace(np.log10(f_min), np.log10(f_max), N) |
|
|
|
A = np.zeros((N,2), dtype=np.complex64) |
|
U = np.zeros((N,), dtype=np.float32) |
|
Z = np.zeros((N,), dtype=np.complex64) |
|
|
|
for k, f in enumerate(ff): |
|
self.set_frequency_A(f) |
|
#self.update_scope #performance optimization ???? |
|
#self.update_ref() #performance optimization |
|
self.measure() |
|
A1, phi1, A2, phi2, u, z = self.analyze_data() |
|
A[k] = A1, A2 |
|
Z[k] = z |
|
U[k] = u |
|
return ff, A, U, Z |
|
|
|
def identify(self): |
|
return 'RedPitaya transducer control' |
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
ctrl = TransducerService() |