Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Python script to send commands to a Rigol scope (or any LXI/SCPI instrument) from first principles.
from zeroconf import *
import socket
import time
import requests
from lxml import etree
Basic steps for a Rigol DS1000Z-series scope
* Zeroconf service discovery of "_scpi-raw._tcp.local."
* Get XML config from <address>/lxi/identification
* My scope has malformed XML in the namespace attributes, where there is a newline before the closing quote, causing
the parser to bork. We should just be able to concat the whole string together by removing newlines.
* Use XPath selector: "ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString" with the
"" namespace.
* For each InstrumentAddressString, split on "::" and look for an IP address followed by a port
* My scope yields a VISA type of "INSTR" for both TCPIP interfaces, when technically it should be "SOCKET" I think
(see: and
* Guessing that an address with no port, or port 80, is the web interface, assume the first one we come across with a
high-range port is our SCPI interface.
* By convention this is port 5025, but Rigol has chosen 5555.
Documentation worth looking at:
class MyListener(object):
def __init__(self):
self.zc_info = None
self.zc_name = None
self.zc_type = None
def remove_service(self, zeroconf, zc_type, zc_name):
print('Service "{0}" removed'.format(zc_name))
def add_service(self, zeroconf, zc_type, zc_name):
self.zc_type = zc_type
self.zc_name = zc_name
self.zc_info = zeroconf.get_service_info(zc_type, zc_name)
def pprint(info, zc_type, zc_name):
print('\nService "{0}" found'.format(zc_name))
print('\tType: {0}'.format(zc_type))
if info:
print('\tAddress: {0}:{1}'.format(socket.inet_ntoa(info.address), info.port))
print('\tServer name: {0}'.format(info.server))
prop =
if prop:
for key, value in prop.items():
print('\t\t{0}: {1}'.format(key, value))
if __name__ == '__main__':
r = Zeroconf()
listener = MyListener()
browser = ServiceBrowser(r, '_scpi-raw._tcp.local.', listener=listener)
while listener.zc_info is None:
# Because multithreading sucks.
MyListener.pprint(listener.zc_info, listener.zc_type, listener.zc_name)
r = requests.get('http://{0}:{1}/lxi/identification'.format(listener.zc_info.server, listener.zc_info.port))
concat = ''.join(r.content.split('\n'))
doc = etree.fromstring(concat)
scpi_address = None
scpi_port = None
for e in doc.xpath("ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString", namespaces={'ns': ''}):
visa_resource = e.text.split('::')
interface_type = visa_resource[0]
if interface_type.startswith('TCPIP'):
address = visa_resource[1:-1]
if len(address) == 2 and int(address[1]) > 1024:
# This is most likely our SCPI address.
scpi_address = address[0]
scpi_port = int(address[1])
if scpi_port is not None and scpi_address is not None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect((scpi_address, scpi_port))
trig_status = s.recv(32)
if trig_status.strip() == 'STOP':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.