Skip to content

Instantly share code, notes, and snippets.

@nanovna
Last active October 24, 2025 14:11
Show Gist options
  • Select an option

  • Save nanovna/af6d7c93221a5673451e6f6e64f210e7 to your computer and use it in GitHub Desktop.

Select an option

Save nanovna/af6d7c93221a5673451e6f6e64f210e7 to your computer and use it in GitHub Desktop.
Read points from NanoRFE VNA demo
#!/usr/bin/env python3
'''
MIT License
Copyright (c) 2022 NanoRFE
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
import serial
import argparse
import tty
import cmath
import sys
import os
import os.path
import time
import numpy
from struct import pack, unpack_from
from time import sleep
from math import pi,log10
from collections import namedtuple
SPar = namedtuple('SPar', 'S11 S21')
_CMD_NOP = 0x00
_CMD_INDICATE = 0x0D
_CMD_READ = 0x10
_CMD_READ2 = 0x11
_CMD_READ4 = 0x12
_CMD_READFIFO = 0x18
_CMD_WRITE = 0x20
_CMD_WRITE2 = 0x21
_CMD_WRITE4 = 0x22
_CMD_WRITE8 = 0x23
_CMD_WRITEFIFO = 0x28
_ADDR_SWEEP_START = 0x00
_ADDR_SWEEP_STEP = 0x10
_ADDR_SWEEP_POINTS = 0x20
_ADDR_SWEEP_VALS_PER_FREQ = 0x22
_ADDR_RAW_SAMPLES_MODE = 0x26
_ADDR_VALUES_FIFO = 0x30
_ADDR_DEVICE_VARIANT = 0xF0
_ADDR_PROTOCOL_VERSION = 0xF1
_ADDR_HARDWARE_REVISION = 0xF2
_ADDR_FW_MAJOR = 0xF3
_ADDR_FW_MINOR = 0xF4
WRITE_SLEEP = 0.05
class VNADevice_NanoRFE:
def __init__(self, serial):
self.serial = serial
tty.setraw(self.serial.fd)
# reset protocol to known state
self.serial.write(pack("<Q", 0))
sleep(WRITE_SLEEP)
firmwareVer = self.readVersion()
# firmware major version of 0xff indicates dfu mode
if firmwareVer[0] == 0xFF:
raise IOError("Device is in DFU mode")
self.sweepStartHz = 200e6
self.sweepStepHz = 1e6
self.sweepPoints = 101
self.sweepAvg = 1
self._updateSweep()
def readValues(self) -> list:
sweepData = [numpy.array((0j,0j,0j,0j)) for i in range(self.sweepPoints)]
sweepValueCount = [0] * self.sweepPoints
# reset protocol to known state
timeout = self.serial.timeout
self.serial.write(pack("<Q", 0))
sleep(WRITE_SLEEP)
# cmd: write register 0x30 to clear FIFO
self.serial.write(
pack("<BBB", _CMD_WRITE, _ADDR_VALUES_FIFO, 0)
)
sleep(WRITE_SLEEP)
# discard buffered values
self.serial.timeout = 0.05
self.serial.read(1024*32)
# we read at most 255 values at a time and the time required
# is roughly 0.3 seconds for 101 points or
# <1 second for 255 points, multiplied by averaging
self.serial.timeout = 1 * self.sweepAvg
pointsLeft = self.sweepPoints * self.sweepAvg
while pointsLeft > 0:
toRead = min(255, pointsLeft)
# cmd: read FIFO, addr 0x30
self.serial.write(
pack(
"<BBB",
_CMD_READFIFO,
_ADDR_VALUES_FIFO,
toRead,
)
)
sleep(WRITE_SLEEP)
# each value is 32 bytes
nBytes = toRead * 32
# serial .read() will try to read nBytes bytes in
# timeout secs
arr = self.serial.read(nBytes)
self._parsePoints(sweepData, sweepValueCount, arr, toRead)
pointsLeft -= toRead
self.serial.timeout = timeout
for i in range(self.sweepPoints):
if sweepValueCount[i] != self.sweepAvg:
raise RuntimeError('only received %d out of %d points for freqIndex %d' % (sweepValueCount[i], self.sweepAvg, i))
return sweepData
def _parsePoints(self, sweepData, sweepValueCount, arr, count) -> None:
freq_index = -1
for i in range(count):
(
fwd_real,
fwd_imag,
rev0_real,
rev0_imag,
rev1_real,
rev1_imag,
freq_index,
) = unpack_from("<iiiiiihxxxxxx", arr, i * 32)
fwd = complex(fwd_real, fwd_imag)
refl = complex(rev0_real, rev0_imag)
thru = complex(rev1_real, rev1_imag)
sweepData[freq_index] += (fwd, refl, 0.0, thru) # out, in, out, in
sweepValueCount[freq_index] += 1
def resetSweep(self, start: int, stop: int):
self.setSweep(start, stop)
def _read_version(self, cmd_0: int, cmd_1: int):
cmd = pack("<BBBB", _CMD_READ, cmd_0, _CMD_READ, cmd_1)
self.serial.write(cmd)
sleep(WRITE_SLEEP)
resp = self.serial.read(2)
if len(resp) != 2:
raise IOError("Timeout reading version registers")
return resp
def readVersion(self) -> "Version":
return self._read_version(_ADDR_FW_MAJOR, _ADDR_FW_MINOR)
def read_board_revision(self) -> "Version":
return self._read_version(
_ADDR_DEVICE_VARIANT, _ADDR_HARDWARE_REVISION
)
def setSweep(self, start, stop, nPoints, nAvg=1):
self.sweepPoints = nPoints
step = (stop - start) / (nPoints - 1)
if start == self.sweepStartHz and step == self.sweepStepHz:
return
self.sweepStartHz = start
self.sweepStepHz = step
self.sweepAvg = nAvg
self._updateSweep()
def _updateSweep(self):
cmd = pack(
"<BBQ",
_CMD_WRITE8,
_ADDR_SWEEP_START,
int(self.sweepStartHz),
)
cmd += pack(
"<BBQ", _CMD_WRITE8, _ADDR_SWEEP_STEP, int(self.sweepStepHz)
)
cmd += pack(
"<BBH", _CMD_WRITE2, _ADDR_SWEEP_POINTS, self.sweepPoints
)
cmd += pack("<BBH", _CMD_WRITE2, _ADDR_SWEEP_VALS_PER_FREQ, self.sweepAvg)
self.serial.write(cmd)
sleep(WRITE_SLEEP)
def _set_register(self, addr, value, size):
packet = b""
if size == 1:
packet = pack("<BBB", _CMD_WRITE, addr, value)
elif size == 2:
packet = pack("<BBH", _CMD_WRITE2, addr, value)
elif size == 4:
packet = pack("<BBI", _CMD_WRITE4, addr, value)
elif size == 8:
packet = pack("<BBQ", _CMD_WRITE8, addr, value)
self.serial.write(packet)
# returns array of tuples (S11, S21), raw values
def readSPar(self):
sys.stdout.write('reading %d points... ' % self.sweepPoints)
sys.stdout.flush()
arr = self.readValues()
ret = [None] * self.sweepPoints
for i in range(self.sweepPoints):
freq = self.sweepStartHz + self.sweepStepHz * i
pt = arr[i]
fwd = pt[0]
refl = pt[1]
thru = pt[3]
ret[i] = SPar(refl/fwd, thru/fwd)
print('')
return ret
# given raw measured values for short, open, load, return calibration coefficients tuple (X,Y,Z)
def SOL_compute_coefficients(S, O, L):
Z = (2.0*L-O-S)/(O-S)
X = L-S*(1.0-Z)
Y = L
return (X, Y, Z)
# given the calibration coefficients and a raw value, compute the reflection coefficient
def SOL_compute_reflection(coeffs, dut):
X,Y,Z = coeffs
return (Y-dut)/(dut*Z-X)
def measureCalStandards(vna):
print('connect SHORT and press enter')
sys.stdin.readline()
measShort = vna.readSPar()
print('connect OPEN and press enter')
sys.stdin.readline()
measOpen = vna.readSPar()
print('connect LOAD and press enter')
sys.stdin.readline()
measLoad = vna.readSPar()
return (measShort, measOpen, measLoad)
parser = argparse.ArgumentParser()
parser.add_argument(
"-s", "--serial", default="/dev/ttyACM0", help="Path to USB serial device"
)
args = parser.parse_args()
def demo_read_points():
nPoints = 101
freqStart = 500e6
freqEnd = 1500e6
freqStep = (freqEnd - freqStart) / (nPoints - 1)
averaging = 1
# open the device
ser = serial.Serial(args.serial)
vna = VNADevice_NanoRFE(ser)
# set sweep parameters
vna.setSweep(freqStart, freqEnd, nPoints, averaging)
# first measure short, open, load raw values
S, O, L = measureCalStandards(vna)
# measure DUT raw values
print('connect DUT and press enter')
sys.stdin.readline()
measDUT = vna.readSPar()
# apply calibration and print
radToDeg = 180./pi
for i in range(nPoints):
coeffs = SOL_compute_coefficients(S[i].S11, O[i].S11, L[i].S11)
calibratedS11 = SOL_compute_reflection(coeffs, measDUT[i].S11)
freqHz = freqStart + i*freqStep
print('%.3f %.5f %.5f' % (
freqHz * 1e-6,
abs(calibratedS11), cmath.phase(calibratedS11) * radToDeg))
demo_read_points()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment