Skip to content

Instantly share code, notes, and snippets.

@sjlongland
Created March 20, 2024 05:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sjlongland/b3a579d4c702b967884b7a648de08bd7 to your computer and use it in GitHub Desktop.
Save sjlongland/b3a579d4c702b967884b7a648de08bd7 to your computer and use it in GitHub Desktop.
Dummy rigctld implementation for GPIO control of radio PTT
#!/usr/bin/env python3
"""
Dummy rigctld implementation for controlling a radio PTT signal via a GPIO
pin.
This is intended for devices like the NWDR UDRC-II and DRAWS boards, which
attach to the top of a Raspberry Pi 2/3/4/5 single-board computer and provide
control signals for PTT and COS along with an audio interface for digital mode
operation.
Many software packages support rigctld as a PTT mechanism, but not all support
GPIO control. This script mimics enough of the rigctld protocol to fool QSSTV
into thinking it's talking to rigctld, and thus allow it to control PTT via
the GPIO pin.
Usage (PTT on GPIO #23; as per UDRC-II mini-din6):
python3 dummyrigctld.py 23
"""
# © 2024 Stuart Longland VK4MSL
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import asyncio
import argparse
import logging
import weakref
import os.path
from functools import partial
class GPIOPin(object):
_SYSFS_PATH = "/sys/class/gpio"
_SYSFS_NAME = "gpio%d"
_EXPORT = os.path.join(_SYSFS_PATH, "export")
_UNEXPORT = os.path.join(_SYSFS_PATH, "unexport")
@staticmethod
def _read_from(file):
with open(file, "r") as f:
return f.read()
@staticmethod
def _write_to(file, data):
with open(file, "w") as f:
return f.write(data)
def __init__(self, nbr, direction="out", state=False):
self._nbr = nbr
self._direction = direction
self._init_state = state
self._path = os.path.join(self._SYSFS_PATH, self._SYSFS_NAME % nbr)
self._dir = os.path.join(self._path, "direction")
self._value = os.path.join(self._path, "value")
@property
def value(self):
return bool(int(self._read_from(self._value)))
@value.setter
def value(self, value):
self._write_to(self._value, "1" if value else "0")
async def setup(self):
self._write_to(self._EXPORT, str(self._nbr))
# Wait for the device to be exported and become writable
while True:
try:
self._write_to(self._dir, self._direction)
break
except OSError:
await asyncio.sleep(0.5)
# Set initial state
self.state = self._init_state
def teardown(self):
self.value = False
self._write_to(self._UNEXPORT, str(self._nbr))
class RigCtlServer(object):
ADDRESS = "127.0.0.1"
PORT = 4532
def __init__(
self, gpio, address="127.0.0.1", port=4532, loop=None, log=None
):
if log is None:
log = logging.getLogger(self.__class__.__name__)
if loop is None:
loop = asyncio.get_event_loop()
self._gpio = gpio
self._log = log
self._loop = loop
self._address = address
self._port = port
async def start(self):
try:
server = await self._loop.create_server(
self._on_connect_start, self._address, self._port
)
self._log.info(
"Server listening %s port %s", self._address, self._port
)
async with server:
await server.serve_forever()
except:
self._log.exception("Failed to open listening socket")
raise
def _on_connect_start(self):
return RigCtlClient(self)
def _exec(self, client, cmd, args):
if cmd in ("\\set_ptt", "T"):
self._set_ptt(bool(int(args)))
client.write("RPRT 0\n")
elif cmd == "\\get_ptt":
res = self._get_ptt()
client.write("%d\n" % (1 if res else 0))
elif cmd == "\\get_powerstat":
# Always report powered on
client.write("1\n")
elif cmd == "\\chk_vfo":
# Client is asking us for VFO data
client._chk_vfo_executed = True
client.write("0\n")
elif cmd == "\\dump_state":
# Report state
client.write(
"1\n" # Protocol v1
"6\n" # Model 6: Dummy No VFO
"0\n" # ITU region -- not used
"0 0 0 0 0 0 0\n" # end of rx range list
"0 0 0 0 0 0 0\n" # end of tx range list
"0 0\n" # end of tuning steps
"0 0\n" # end of filters
"0\n" # max_rit
"0\n" # max_xit
"0\n" # max_ifshift
"0\n" # announces
"0 0 0 0 0 0 0 0\n" # preamp
"0 0 0 0 0 0 0 0\n" # attenuator
"0x00000000\n" # has_get_func
"0x00000000\n" # has_set_func
"0x00000000\n" # has_get_level
"0x00000000\n" # has_set_level
"0x00000000\n" # has_get_parm
"0x00000000\n" # has_set_parm
)
if client._chk_vfo_executed:
client.write(
"vfo_opts=0x00000000\n"
"ptt_type=0x00000001\n" # PTT=RIG
"targetable_vfo=0x00000000\n"
"has_set_vfo=0\n"
"has_get_vfo=0\n"
"has_set_freq=0\n"
"has_get_freq=0\n"
"has_set_conf=0\n"
"has_get_conf=0\n"
"has_power2mW=0\n"
"has_mw2power=0\n"
"timeout=0\n"
"rig_model=6\n"
"rigctl_version=4.5.5\n"
"agc_levels=\n"
"done\n"
)
client.write("0\n") # OK
else:
raise ValueError("Unknown command %r (args %r)" % (cmd, args))
def _set_ptt(self, state):
self._gpio.value = state
self._log.info("PTT <= %r", state)
def _get_ptt(self):
state = self._gpio.value
self._log.info("PTT => %r", state)
return state
class RigCtlClient(asyncio.Protocol):
def __init__(self, server):
self._server = weakref.ref(server)
self._transport = None
self._log = server._log.getChild("new")
self._loop = server._loop
self._rx_buffer = bytearray()
self._chk_vfo_executed = False
def connection_made(self, transport):
server = self._server()
if server is None:
transport.close()
return
(addr, port) = transport.get_extra_info("peername")
self._log = server._log.getChild(
"peer:%s#%d"
% (
addr,
port,
)
)
self._log.info("Connected")
self._transport = transport
def data_received(self, data):
self._log.debug("Received %r", data)
self._rx_buffer += data
self._loop.call_soon(self._scan_buffer)
def _scan_buffer(self):
server = self._server()
if server is None:
self._transport.close()
return
# Rigctl is a line-oriented protocol, end of command is a newline.
try:
end = self._rx_buffer.index(b"\n")
except ValueError:
self._log.debug("Partial content: %r", bytes(self._rx_buffer))
return
cmd = bytes(self._rx_buffer[:end])
self._rx_buffer = self._rx_buffer[end + 1 :]
self._log.debug(
"Received %r, remainder: %r", cmd, bytes(self._rx_buffer)
)
if len(self._rx_buffer):
self._loop.call_soon(self._scan_buffer)
try:
cmd = cmd.decode("US-ASCII")
if " " in cmd:
(cmd, args) = cmd.split(" ", 1)
else:
(cmd, args) = (cmd, "")
self._log.info("Received %r args %r", cmd, args)
server._exec(self, cmd, args)
except:
self._log.exception("Failed to handle %r", cmd)
self._transport.close()
def connection_lost(self, exc):
self._log.info("Disconnected (exc=%r)", exc)
def write(self, data):
self._log.debug("Sending %r", data)
self._transport.write(data.encode("US-ASCII"))
if __name__ == "__main__":
async def main():
ap = argparse.ArgumentParser(
description="Mock rigctld for controlling a radio via GPIO"
)
ap.add_argument(
"--log-level",
type=str,
choices=("debug", "info", "warning", "error"),
help="Debugging level",
default="info",
)
ap.add_argument(
"--address",
type=str,
help="Bind address for server",
default=RigCtlServer.ADDRESS,
)
ap.add_argument(
"--port",
type=int,
help="Bind port number for server",
default=RigCtlServer.PORT,
)
ap.add_argument("gpio", type=int, help="GPIO pin number for PTT")
args = ap.parse_args()
logging.basicConfig(level=args.log_level.upper())
gpio = GPIOPin(args.gpio)
await gpio.setup()
try:
server = RigCtlServer(
gpio=gpio, address=args.address, port=args.port
)
await server.start()
finally:
gpio.teardown()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment