Skip to content

Instantly share code, notes, and snippets.

@geggo
Created April 21, 2017 08:20
Show Gist options
  • Save geggo/baf1b9b804056c1766a39cca50c16e95 to your computer and use it in GitHub Desktop.
Save geggo/baf1b9b804056c1766a39cca50c16e95 to your computer and use it in GitHub Desktop.
Red Pitaya Python Signal Generator and Vector Analyzer

Red Pitaya

outputs: summed and amplified to transducer (CH B optional)

inputs: CH A voltage at amplifier output CH B: voltage across 10 Ohm shunt resistor, in series with transducer

service_transducer.py:

  • set frequency/amplitude of CH A/B
  • acquire data / analyze data (amplitude, phase)

server.py:

generate 1 0.1 1000 1000 sine nohup python server.py & tail -f nohup.out

Host PC

RedPitayaProxy.py: transparently provide access to service_transducer.py via network

RedPitaya.py: user interface

import sys
import time
from traits.api import *
from traitsui.api import *
from traits_persistence import HasTraitsPersistent
import numpy as np
from RedPitayaProxy import RP_Proxy
import sys, traceback
#RP_address = 'tcp://192.168.52.55:5555'
class RedPitaya(HasTraitsPersistent):
settings_id = 'redpitaya'
proxy = Python(transient=True)
frequency_kHz = Range(0, 40000)
frequency = Float
frequency_real = Float
frequency_B = Float
frequency_B_real = Float
voltage = Float
voltage_real = Float
voltage_B = Float
voltage_B_real = Float
measure = Button
voltage_transducer = Float
impedance = Float
phase = Float
sweep_start = Button
sweep_active = Bool(False, transient=True)
sweep_frequency_min = Float
sweep_frequency_max = Float
sweep_logarithmic = Bool
sweep_number_points = Int
results = Python(transient=True)
view = View(
VGroup(
Item('frequency_kHz'),
HGroup(
Item('frequency'),
Item('voltage'),
),
HGroup(
Item('frequency_B'),
Item('voltage_B'),
),
HGroup(
Item('frequency_real', style='readonly'),
Item('frequency_B_real', style='readonly'),
),
HGroup(
Item('measure'),
Item('voltage_transducer', style='readonly'),
Item('impedance', style='readonly'),
Item('phase', style='readonly'),
),
VGroup(
Item('sweep_start', enabled_when = 'not sweep_active'),
Item('sweep_frequency_min'),
Item('sweep_frequency_max'),
Item('sweep_number_points'),
Item('sweep_logarithmic'),
),
),
resizable=True,
)
def __init__(self, *args, **kwargs):
super(RedPitaya, self).__init__(*args, **kwargs)
self.proxy = RP_Proxy()
self.proxy.REQUEST_TIMEOUT = 1000 #1s timeout
answer = self.proxy.identify()
if 'RedPitaya' not in answer:
print 'RedPitaya not found, switch it on and start server'
raise RuntimeError('Cannot open device')
self.on_trait_change(self.start_sweep,
'sweep_start',
dispatch='new')
def _frequency_kHz_changed(self, value):
self.frequency = value*1e-3
def _frequency_changed(self, value):
self.frequency_real = self.proxy.set_frequency_A(value*1e6)
def _voltage_changed(self, value):
self.proxy.set_amplitude_A(value)
def _frequency_B_changed(self, value):
self.frequency_B_real = self.proxy.set_frequency_B(value*1e6)
def _voltage_B_changed(self, value):
self.proxy.set_amplitude_B(value)
def _measure_fired(self, value):
self.proxy.measure()
a1, ph1, a2, ph2, U, Z = self.proxy.analyze_data()
self.impedance, self.phase, self.voltage_transducer = abs(Z), 180./np.pi*np.angle(Z), U
#dispatched in new thread
def start_sweep(self, value):
if self.sweep_active:
print "sweep active, ignored"
return
try:
self.sweep_active=True
print "sweep start"
self.perform_sweep()
print "sweep done"
except Exception as e:
print "Error during sweep:", e
exc_type, exc_value, exc_tb = sys.exc_info()
info = str(e)
print '\n'.join(traceback.format_exception(exc_type, exc_value, exc_tb)[2:])
finally:
self.sweep_active=False
def perform_sweep(self):
"""start frequency sweep, set self.results with results"""
#sweep takes longer, increase timeout
timeout = int(1000. * 3./100*self.sweep_number_points)
self.proxy.REQUEST_TIMEOUT = timeout
#tic = time.clock()
r = self.proxy.sweep(N=self.sweep_number_points,
f_min = self.sweep_frequency_min*1e6,
f_max = self.sweep_frequency_max*1e6,
linear = not self.sweep_logarithmic)
#print "sweep finished in %.2fs, timeout was %.2fs" % (time.clock()-tic, timeout*1e-3)
self.proxy.REQUEST_TIMEOUT = 1000
if r is not None:
f, A, U, Z = r
r = dict(RP_frequency = f,
RP_voltage_transducer = U,
RP_impedance_transducer = Z,
RP_voltages = A)
self.results = r
else:
print "sweep failed"
self._frequency_changed(self.frequency) #reset frequency
if __name__ == '__main__':
rp = RedPitaya()
#print "type in rp.configure_traits() !"
rp.load_settings()
#rp.sweep_start = True
#rp.edit_traits()
rp.configure_traits()
import sys
import zmq
import pickle
import time
#SERVER_ENDPOINT = "tcp://red-pitaya.i-med.intern:5555"
SERVER_ENDPOINT = "tcp://192.168.52.51:5555"
#SERVER_ENDPOINT = "tcp://redpitaya.local:5555"
context = zmq.Context(1)
class _Method:
def __init__(self, proxy, command):
self.__proxy = proxy
self.__command = command
def __call__(self, *args, **kwargs):
return self.__proxy.remote_call(self.__command, *args, **kwargs)
class RP_Proxy(object):
REQUEST_RETRIES = 2
REQUEST_TIMEOUT = 2000
def __init__(self):
self.poll = zmq.Poller()
self._init_connection()
self.sequence = 0
def _init_connection(self):
print "I: Connecting to server"
self.client = context.socket(zmq.REQ)
self.client.connect(SERVER_ENDPOINT)
self.poll.register(self.client, zmq.POLLIN)
def __getattr__(self, method):
if method.startswith('_') or method=='trait_names':
raise AttributeError
return _Method(self, method)
def remote_call(self, command, *args, **kwargs):
#print "I: Sending command (%s)" % command, args, kwargs
self.sequence += 1
request = (command,
str(self.sequence),
pickle.dumps(args),
pickle.dumps(kwargs))
self.client.send_multipart(request)
retries_left = self.REQUEST_RETRIES
result = None
expect_reply = True
while expect_reply:
socks = dict(self.poll.poll(self.REQUEST_TIMEOUT))
if socks.get(self.client) == zmq.POLLIN: #got some message
status, s, msg = self.client.recv_multipart()
sequence_received = int(s)
result = pickle.loads(msg)
#if not status:
# break
if sequence_received != self.sequence:
print "got wrong sequence number"
elif status == 'OK' or status == 'ERROR':
#print 'got result', status, result
break
#expect_reply = False
else:
break
else: #timeout
print "W: No response from server, retrying"
# Socket is confused. Close and remove it.
self.client.setsockopt(zmq.LINGER, 0)
self.client.close()
self.poll.unregister(self.client)
retries_left -= 1
if retries_left == 0:
print "E: Server seems to be offline, abandoning"
#TODO: raise Exception?
break
#reconnect and resend
#print "I: Reconnecting and resending (%s)" % command
self._init_connection()
self.client.send_multipart(request)
return result
if __name__ == '__main__':
print "on RedPitaya, start server.py"
proxy = RP_Proxy()
#print proxy.remote_call('say_hello_array', 13, name = 'my dear client')
print proxy.identify()
#print proxy.say_hello_array(42)
#context.term()
#context.destroy()
#!/usr/bin/python
# -*- coding: utf-8 -*-
import zmq
import pickle
import sys, traceback
context = zmq.Context(1)
def serve_forever(service):
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")
print "started server"
try:
while True:
#receive command and unserialize arguments
command, sequence, args, kwargs = server.recv_multipart()
args = pickle.loads(args)
kwargs = pickle.loads(kwargs)
#print "got command:", command, sequence
#print "arguments:", args
#print "kwargs:", kwargs
try:
method = service.__getattribute__(command) #lookup method
result = method(*args, **kwargs) #execute
except (AttributeError, Exception) as e:
status = 'ERROR'
result = e
#print e
exc_type, exc_value, exc_tb = sys.exc_info()
info = str(e)
print '\n'.join(traceback.format_exception(exc_type, exc_value, exc_tb)[2:])
else:
status = 'OK'
server.send_multipart((status,
bytes(sequence),
pickle.dumps(result)))
print "command done:", status, command, sequence
sys.stdout.flush()
except KeyboardInterrupt:
print "closing down server"
server.setsockopt(zmq.LINGER, 0)
server.close()
def test_simple_service():
import numpy
import time
class TestService(object):
def say_hello(self, nr, message = 'hi', name = 'gregor'):
return "Agent #%03d says '%s' to %s"%(nr, message, name)
def say_hello_array(self, nr, name='no name'):
return numpy.array([nr], dtype = numpy.complex64)
print "starting simple service"
serve_forever(TestService())
def serve_transducer():
from service_transducer import TransducerService
service = TransducerService()
serve_forever(service)
if __name__ == '__main__':
#test_simple_service()
serve_transducer()
#!/usr/bin/python
# -*- coding: utf-8 -*-
import numpy as np
import time
from PyRedPitaya.board import RedPitaya
from PyRedPitaya.instrument import TriggerSource
class AWG(object):
def __init__(self):
self.awgA = RedPitaya().asga #TODO: initialize
self.awgB = RedPitaya().asgb
self.setup()
def setup(self, frequency=0, amplitude=0):
for awg in (self.awgA, self.awgB):
#awg = self.awg
awg.output_zero = True
awg.sm_reset = True
awg.trig_selector = 0
awg.scale = int(2**13*amplitude) #TODO: use amplitude
awg.offset = 0
#waveform = np.zeros(2**14, dtype=np.int32)
t = np.arange(2**14, dtype=np.float32)
d = (2**13-1)*np.cos(t*(2*np.pi/2**14)) #cos for easy triggering
awg.data = np.round(d).astype(np.int32)
awg.start_offset = 0
awg.counter_wrap = 2**16*(2**14-1)
awg.frequency = frequency
#single signal output
#self.sm_onetimetrigger = True
#self.sm_wrappointer = False
awg.sm_onetimetrigger = False
awg.sm_wrappointer = True
awg.output_zero = False
awg.sm_reset = False
def set_singleshot(self, singleshot=True):
awg = self.awg
#awg.sm_reset = True
if singleshot:
awg.sm_wrappointer = False
awg.sm_onetimetrigger = True
else:
awg.sm_onetimetrigger = False
awg.sm_wrappointer = True
#awg.sm_reset = False
def _trig(self, awg, frequency=None):
if not frequency is None:
awg.frequency = frequency
awg.start_offset = 0
awg.trig_selector = 1
awg.trig_selector = 0
def trig_A(self, frequency=None):
self._trig(self.awgA, frequency)
def trig_B(self, frequency=None):
self._trig(self.awgB, frequency)
def _set_frequency(self, awg, f):
awg.frequency = f
return awg.frequency
def set_frequency_A(self, f):
return self._set_frequency(self.awgA, f)
def set_frequency_B(self, f):
return self._set_frequency(self.awgB, f)
def _set_amplitude(self, awg, amplitude):
awg.scale = int(2**13*amplitude)
def set_amplitude_A(self, amplitude):
self._set_amplitude(self.awgA, amplitude)
def set_amplitude_B(self, amplitude):
self._set_amplitude(self.awgB, amplitude)
class Scope(object):
TriggerSource = TriggerSource
def __init__(self,
decimation = 1,
record_length = 1000,
channel = 'A',
):
self.scope = RedPitaya().scope
self.scope.data_decimation = decimation
self.record_length = record_length
self._record_length=record_length
self.scope.threshold_ch1 = 0
self.scope.threshold_ch2 = 0
#record_length: samples after trigger
@property
def record_length(self):
return self._record_length
@record_length.setter
def record_length(self, value):
if value<0 or value>2**14:
raise ValueError('record length must be in range 0 - 16384)')
self._record_length = value
self.scope.trigger_delay = self._record_length
def get_data_after_trigger(self, channel='A'):
if channel=='A':
data_base_addr = 0x10000
elif channel=='B':
data_base_addr = 0x20000
else:
raise ValueError('unknown channel')
N = self.record_length
trigger_pos = self.scope.write_pointer_trigger
addr = data_base_addr + 4*trigger_pos
if (trigger_pos+N) <= (2**14):
data = self.scope.reads(addr, N)
data = data.copy() #data is read-only, why?
else:
N1 = 2**14 - trigger_pos
N2 = N-N1
data1 = self.scope.reads(addr, N1)
data2 = self.scope.reads(data_base_addr, N2)
data = np.concatenate((data1, data2))
data.dtype = np.int32
data[data>2**13] -= 2**14
return data
def trigger_now(self):
self.scope.arm_trigger() #starts acquisition
self.scope.trigger_source = TriggerSource.immediately #trigger
#self.wait_for_acquisition_finished()
def start(self):
self.scope.arm_trigger()
self.scope.trigger_source = TriggerSource.chA_posedge
def wait_for_acquisition_finished(self):
#after trigger: poll until trigger_source resets to zero
k = 0
while self.scope.trigger_source != 0 and k < 100:
time.sleep(0.001)
k += 1
#wait until write pointer does not change anymore
time.sleep(0.0005) #TODO: remove me
class TransducerService(object):
scale_amplitude = 1./8 #
scale_chA = 25./2**13 #scaling scope, HV
scale_chB = 1./2**13
#offsets input, used for compensation
offset_chA = 0.020 #-0.038
offset_chB = 0.019
N_points = 500
R_shunt = 10.
def __init__(self):
self.awg = AWG()
self.awg.trig_A() #start output
self.awg.trig_B()
self.scope = Scope()
self._frequency_A = 0
self._frequency_B = 0
self.set_frequency_A(1e6)
self.set_amplitude_A(0.1)
self.set_frequency_B(1.31415e6)
self.set_amplitude_B(0.1)
def set_frequency_A(self, f):
#0.5 ms without update
old_value = self._frequency_A
new_value = self.awg.set_frequency_A(f)
#TODO: set decimation/average factor: N_points*sample_time < period:
self._frequency_A = new_value
if old_value != new_value:
self.invalidate_ref()
return self._frequency_A
def set_frequency_B(self, f):
#0.5 ms without update
old_value = self._frequency_B
new_value = self.awg.set_frequency_B(f)
#TODO: set decimation/average factor: N_points*sample_time < period:
self._frequency_B = new_value
return self._frequency_B
def update_scope(self):
#set record length for complete cycles, about 1000 samples
T = 1./self._frequency_A
n_period = T/(8e-9*self.scope.scope.data_decimation) #points per period
if n_period > 2000:
raise ValueError #frequency too low -> decimate?
n_cycles = np.ceil(self.N_points/n_period) #number of cycles in self.N_points
n_points = int(round(n_cycles * n_period)) #adjust record length for integer number of complete cycle
self.scope.record_length = n_points
def invalidate_ref(self):
self.ref1 = None
self.ref2 = None
def update_ref(self):
#calculate reference (sine/cosine) signal
t = 8e-9*self.scope.scope.data_decimation*np.arange(self.scope.record_length,
dtype=np.float32
)
ph = 2*np.pi*self._frequency_A*t
self.ref1 = np.sin(ph) #slow
self.ref2 = np.cos(ph)
def set_amplitude_A(self, amplitude):
"set desired amplitude in Vpp"
self.awg.set_amplitude_A(self.scale_amplitude*amplitude)
def set_amplitude_B(self, amplitude):
"set desired amplitude in Vpp"
self.awg.set_amplitude_B(self.scale_amplitude*amplitude)
def measure(self):
self.update_scope()
self.scope.start()
#self.scope.trigger_now()
self.scope.wait_for_acquisition_finished()
self.data_chA = self.scope.get_data_after_trigger(channel='A').astype(np.float32)*self.scale_chA - self.offset_chA
self.data_chB = self.scope.get_data_after_trigger(channel='B').astype(np.float32)*self.scale_chB - self.offset_chB
def analyze_data(self):
#1 ms, 500pts
if self.ref1 is None:
self.update_ref()
data_A = self.data_chA
X1 = np.mean(self.ref1*self.data_chA)
Y1 = np.mean(self.ref2*self.data_chA)
X2 = np.mean(self.ref1*self.data_chB)
Y2 = np.mean(self.ref2*self.data_chB)
U1 = np.complex64(complex(X1,Y1))
U2 = np.complex64(complex(X2,Y2))
A1 = abs(U1)
A2 = abs(U2)
phi1 = np.angle(U1)
phi2 = np.angle(U2)
I = U2/self.R_shunt #current shunt = current piezo
U = U1-U2 #current across piezo
Z = U/I #impedance piezo
return 4*A1, phi1, 4*A2, phi2, 4*abs(U), Z
def sweep(self, N=201, f_min=1e6, f_max=30e6, linear=False, amplitude = None):
"""perform sweep,
return (complex) amplitude amplifier & shunt, voltage DUT, complex impedance
"""
if amplitude is not None:
self.set_amplitude(amplitude)
if linear:
ff = np.linspace(f_min, f_max, N)
else:
ff = np.logspace(np.log10(f_min), np.log10(f_max), N)
A = np.zeros((N,2), dtype=np.complex64)
U = np.zeros((N,), dtype=np.float32)
Z = np.zeros((N,), dtype=np.complex64)
for k, f in enumerate(ff):
self.set_frequency_A(f)
#self.update_scope #performance optimization ????
#self.update_ref() #performance optimization
self.measure()
A1, phi1, A2, phi2, u, z = self.analyze_data()
A[k] = A1, A2
Z[k] = z
U[k] = u
return ff, A, U, Z
def identify(self):
return 'RedPitaya transducer control'
if __name__ == '__main__':
ctrl = TransducerService()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment