Skip to content

Instantly share code, notes, and snippets.

@GeoSpark
Created March 28, 2015 12:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save GeoSpark/c67b7d6ebdda55929fbd to your computer and use it in GitHub Desktop.
Save GeoSpark/c67b7d6ebdda55929fbd to your computer and use it in GitHub Desktop.
Python script to send commands to a Rigol scope (or any LXI/SCPI instrument) from first principles.
from zeroconf import *
import socket
import time
import requests
from lxml import etree
"""
Basic steps for a Rigol DS1000Z-series scope
--------------------------------------------
* Zeroconf service discovery of "_scpi-raw._tcp.local."
* Get XML config from <address>/lxi/identification
* My scope has malformed XML in the namespace attributes, where there is a newline before the closing quote, causing
the parser to bork. We should just be able to concat the whole string together by removing newlines.
* Use XPath selector: "ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString" with the
"http://www.lxistandard.org/InstrumentIdentification/1.0" namespace.
* For each InstrumentAddressString, split on "::" and look for an IP address followed by a port
* My scope yields a VISA type of "INSTR" for both TCPIP interfaces, when technically it should be "SOCKET" I think
(see: http://zone.ni.com/reference/en-XX/help/371361J-01/lvinstio/visa_resource_name_generic/ and
http://digital.ni.com/public.nsf/allkb/6A9285AC83C646BA86256BDC004FD4D4)
* Guessing that an address with no port, or port 80, is the web interface, assume the first one we come across with a
high-range port is our SCPI interface.
* By convention this is port 5025, but Rigol has chosen 5555.
Documentation worth looking at:
* http://lxistandard.org/Documents/Specifications/LXI%20Device%20Specification%202011%20rev%201.4.pdf
* http://lxistandard.org/GuidesForUsingLXI/Introducing%20LXI%20To%20Your%20Network%20Administrator%20May%2024_2013.pdf
* http://lxistandard.org/GuidesForUsingLXI/LXI_Getting_Started_Guide_May_1_2013.pdf
* http://beyondmeasure.rigoltech.com/acton/attachment/1579/f-0386/1/-/-/-/-/DS1000Z_Programming%20Guide_EN.pdf
"""
class MyListener(object):
def __init__(self):
self.zc_info = None
self.zc_name = None
self.zc_type = None
def remove_service(self, zeroconf, zc_type, zc_name):
print('Service "{0}" removed'.format(zc_name))
def add_service(self, zeroconf, zc_type, zc_name):
self.zc_type = zc_type
self.zc_name = zc_name
self.zc_info = zeroconf.get_service_info(zc_type, zc_name)
@staticmethod
def pprint(info, zc_type, zc_name):
print('\nService "{0}" found'.format(zc_name))
print('\tType: {0}'.format(zc_type))
if info:
print('\tAddress: {0}:{1}'.format(socket.inet_ntoa(info.address), info.port))
print('\tServer name: {0}'.format(info.server))
prop = info.properties
if prop:
print('\tProperties:')
for key, value in prop.items():
print('\t\t{0}: {1}'.format(key, value))
if __name__ == '__main__':
r = Zeroconf()
listener = MyListener()
browser = ServiceBrowser(r, '_scpi-raw._tcp.local.', listener=listener)
while listener.zc_info is None:
# Because multithreading sucks.
time.sleep(0.01)
r.close()
MyListener.pprint(listener.zc_info, listener.zc_type, listener.zc_name)
r = requests.get('http://{0}:{1}/lxi/identification'.format(listener.zc_info.server, listener.zc_info.port))
concat = ''.join(r.content.split('\n'))
doc = etree.fromstring(concat)
scpi_address = None
scpi_port = None
for e in doc.xpath("ns:Interface[@InterfaceType = 'LXI']/ns:InstrumentAddressString", namespaces={'ns': 'http://www.lxistandard.org/InstrumentIdentification/1.0'}):
visa_resource = e.text.split('::')
interface_type = visa_resource[0]
if interface_type.startswith('TCPIP'):
address = visa_resource[1:-1]
if len(address) == 2 and int(address[1]) > 1024:
# This is most likely our SCPI address.
scpi_address = address[0]
scpi_port = int(address[1])
break
if scpi_port is not None and scpi_address is not None:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
s.connect((scpi_address, scpi_port))
s.send(':TRIG:STAT?')
trig_status = s.recv(32)
if trig_status.strip() == 'STOP':
s.send(':RUN')
else:
s.send(':STOP')
s.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment