Skip to content

Instantly share code, notes, and snippets.

@ppkliu
Forked from pklaus/scpi_tcp.py
Created May 22, 2021
Embed
What would you like to do?
Python script to send commands to a Rigol scope (or any LXI/SCPI instrument) from first principles.
#!/usr/bin/env python
"""
Zeroconf Discovery for Rigol DS1000Z-series scopes
--------------------------------------------------
Documentation worth looking at:
* http://lxistandard.org/Documents/Specifications/LXI%20Device%20Specification%202011%20rev%201.4.pdf
* http://lxistandard.org/GuidesForUsingLXI/Introducing%20LXI%20To%20Your%20Network%20Administrator%20May%2024_2013.pdf
* http://lxistandard.org/GuidesForUsingLXI/LXI_Getting_Started_Guide_May_1_2013.pdf
* http://beyondmeasure.rigoltech.com/acton/attachment/1579/f-0386/1/-/-/-/-/DS1000Z_Programming%20Guide_EN.pdf
"""
from zeroconf import *
import socket
import time
import requests
from lxml import etree
import re
try:
clock = time.perf_counter
except AttributeError:
clock = time.time
class Listener(object):
def __init__(self, filter_func=None):
self.results = []
self.filter_func = filter_func
def remove_service(self, zeroconf, zc_type, zc_name):
#print('Service "{0}" removed'.format(zc_name))
pass
def add_service(self, zeroconf, zc_type, zc_name):
zc_info = zeroconf.get_service_info(zc_type, zc_name)
zc_info._properties = {k: v for k, v in zc_info.properties.items() if v is not None}
result = {
'zc_name' : zc_name,
'zc_type' : zc_type,
'zc_info' : zc_info,
}
if self.filter_func:
if self.filter_func(result):
self.results.append(result)
else:
self.results.append(result)
@staticmethod
def pprint(zc_name, zc_type, zc_info):
print('\nService "{0}" found'.format(zc_name))
print('\tType: {0}'.format(zc_type))
if zc_info:
print('\tAddress: {0}:{1}'.format(socket.inet_ntoa(zc_info.address), zc_info.port))
print('\tServer name: {0}'.format(zc_info.server))
prop = zc_info.properties
if prop:
print('\tProperties:')
for key, value in prop.items():
if not value: continue
(key, value) = (key.decode('ascii'), value.decode('ascii'))
print('\t\t{0}: {1}'.format(key, value))
def get_ds1000z_results(if_any_return_after=0.8, timeout=2.5):
"""
Zeroconf service discovery of "_scpi-raw._tcp.local."
The results are filtered for entries matching the Rigol DS1000Z scope series.
"""
zc = Zeroconf()
def ds1000z_filter(result):
check_results = [
re.match(b'DS1\d\d\dZ', result['zc_info'].properties[b'Model']),
re.match(b'RIGOL TECHNOLOGIES', result['zc_info'].properties[b'Manufacturer']),
]
if not all(check_results):
return False
return True
listener = Listener(filter_func=ds1000z_filter)
browser = ServiceBrowser(zc, '_scpi-raw._tcp.local.', listener=listener)
start = clock()
while True:
# Because multithreading sucks.
et = clock() - start # elapsed time
if len(listener.results) and et >= if_any_return_after:
break
if et >= timeout:
break
time.sleep(0.005)
zc.close()
return listener.results
def get_scpi_connection_tuple(http_connection_tuple):
"""
* Get XML config from http://<address>:<port>/lxi/identification
* My scope has malformed XML in the namespace attributes, where there is a newline before the closing quote, causing
the parser to bork. We should just be able to concat the whole string together by removing newlines.
* Use XPath selector: "ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString" with the
"http://www.lxistandard.org/InstrumentIdentification/1.0" namespace.
* For each InstrumentAddressString, split on "::" and look for an IP address followed by a port
* My scope yields a VISA type of "INSTR" for both TCPIP interfaces, when technically it should be "SOCKET" I think
(see: http://zone.ni.com/reference/en-XX/help/371361J-01/lvinstio/visa_resource_name_generic/ and
http://digital.ni.com/public.nsf/allkb/6A9285AC83C646BA86256BDC004FD4D4)
* Guessing that an address with no port, or port 80, is the web interface, assume the first one we come across with a
high-range port is our SCPI interface.
* By convention this is port 5025, but Rigol has chosen 5555.
"""
lxi_ident_url = 'http://{0}:{1}/lxi/identification'.format(*http_connection_tuple)
r = requests.get(lxi_ident_url)
doc = etree.fromstring(r.content.replace(b'\n', b''))
scpi_address = None
scpi_port = None
for e in doc.xpath("ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString", namespaces={'ns': 'http://www.lxistandard.org/InstrumentIdentification/1.0'}):
visa_resource = e.text.split('::')
interface_type = visa_resource[0]
if interface_type.startswith('TCPIP'):
address = visa_resource[1:-1]
if len(address) == 2 and int(address[1]) > 1024:
# This is most likely our SCPI address.
scpi_address = address[0]
scpi_port = int(address[1])
break
return (scpi_address, scpi_port)
def test_scpi(scpi_connection_tuple):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect(scpi_connection_tuple)
s.send(b':TRIG:STAT?')
trig_status = s.recv(32)
if trig_status.strip() == b'STOP':
print('Starting acquisition now...')
s.send(b':RUN')
else:
print('Stoping acquisition now...')
s.send(b':STOP')
s.close()
def main():
results = get_ds1000z_results()
for result in results:
Listener.pprint(**result)
print()
for result in results:
print('Trying to connect to {}...'.format(socket.inet_ntoa(result['zc_info'].address)))
scpi_connection = get_scpi_connection_tuple((socket.inet_ntoa(result['zc_info'].address), result['zc_info'].port))
if scpi_connection is not (None, None):
test_scpi(scpi_connection)
print()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment