Skip to content

Instantly share code, notes, and snippets.

@Allexik
Created March 20, 2023 09:50
Show Gist options
  • Save Allexik/b64f0eb3112c75fc86295e6b6c88068e to your computer and use it in GitHub Desktop.
Save Allexik/b64f0eb3112c75fc86295e6b6c88068e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (C) 2018-2022 Vasily Evseenko <svpcom@p2ptech.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
#
import time
import json
import os
import sys
import re
import hashlib
import json
from wfb_ng.antenna.antenna_protocols import AntennaFactory, AntennaProtocol
from wfb_ng import log
from twisted.internet import reactor, defer, main as ti_main
from twisted.internet.protocol import ProcessProtocol, DatagramProtocol, Protocol, Factory
from twisted.protocols.basic import LineReceiver
from twisted.internet.serialport import SerialPort
from twisted.python import failure
from wfb_ng.common import abort_on_crash, exit_status, df_sleep
from wfb_ng.lora.lora_state_management import LoraStatusOnGSManager
from wfb_ng.lora.telemetry_filter import MavlinkFilter
from wfb_ng.proxy import UDPProxyProtocol, SerialProxyProtocol, ARMProtocol, call_and_check_rc, ExecError
from wfb_ng.tuntap import TUNTAPProtocol, TUNTAPTransport
from wfb_ng.conf import settings, cfg_files
from wfb_ng.lora.lora_initializer import init_lora_tx, init_lora_rx
from wfb_ng.sdr.sdr_protocol import SDRManager
from wfb_ng.www import server as ui_server
connect_re = re.compile(r'^connect://(?P<addr>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P<port>[0-9]+)$', re.IGNORECASE)
listen_re = re.compile(r'^listen://(?P<addr>[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+):(?P<port>[0-9]+)$', re.IGNORECASE)
serial_re = re.compile(r'^serial:(?P<dev>[a-z0-9\-\_/]+):(?P<baud>[0-9]+)$', re.IGNORECASE)
class DbgProtocol(LineReceiver):
delimiter = b'\n'
def __init__(self, rx_id):
self.rx_id = rx_id
def lineReceived(self, line):
log.msg('%s: %s' % (self.rx_id, line.decode('utf-8')))
class RXProtocol(ProcessProtocol):
def __init__(self, antenna_stat, cmd, rx_id):
self.cmd = cmd
self.rx_id = rx_id
self.ant = AntennaProtocol(antenna_stat, rx_id) if antenna_stat is not None else None
self.dbg = DbgProtocol(rx_id)
self.df = defer.Deferred()
def connectionMade(self):
log.msg('Started %s' % (self.rx_id,))
def outReceived(self, data):
if self.ant is not None:
self.ant.dataReceived(data)
def errReceived(self, data):
self.dbg.dataReceived(data)
def processEnded(self, status):
rc = status.value.exitCode
log.msg('Stopped RX %s with code %s' % (self.rx_id, rc))
if rc == 0:
self.df.callback(str(status.value))
else:
self.df.errback(status)
def start(self):
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ,
childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df)
class TXProtocol(ProcessProtocol):
def __init__(self, cmd, tx_id):
self.cmd = cmd
self.tx_id = tx_id
self.dbg = DbgProtocol(tx_id)
self.df = defer.Deferred()
def connectionMade(self):
log.msg('Started %s' % (self.tx_id,))
def outReceived(self, data):
self.dbg.dataReceived(data)
def errReceived(self, data):
self.dbg.dataReceived(data)
def processEnded(self, status):
rc = status.value.exitCode
log.msg('Stopped TX %s with code %s' % (self.tx_id, rc))
if rc == 0:
self.df.callback(str(status.value))
else:
self.df.errback(status)
def start(self):
df = defer.maybeDeferred(reactor.spawnProcess, self, self.cmd[0], self.cmd, env=os.environ,
childFDs={0: "w", 1: "r", 2: "r"})
return df.addCallback(lambda _: self.df)
@defer.inlineCallbacks
def init_wlans(profile, wlans):
max_bw = max(getattr(getattr(settings, '%s_mavlink' % profile), 'bandwidth'),
getattr(getattr(settings, '%s_video' % profile), 'bandwidth'))
if max_bw == 20:
ht_mode = 'HT20'
elif max_bw == 40:
ht_mode = 'HT40+'
else:
raise Exception('Unsupported bandwith %d MHz' % (max_bw,))
try:
yield call_and_check_rc('iw', 'reg', 'set', settings.common.wifi_region)
for wlan in wlans:
if settings.common.set_nm_unmanaged and os.path.exists('/usr/bin/nmcli'):
device_status = yield call_and_check_rc('nmcli', 'device', 'show', wlan, log_stdout=False)
if not b'(unmanaged)' in device_status:
log.msg('Switch %s to unmanaged state' % (wlan,))
yield call_and_check_rc('nmcli', 'device', 'set', wlan, 'managed', 'no')
yield df_sleep(1)
yield call_and_check_rc('ip', 'link', 'set', wlan, 'down')
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'monitor', 'otherbss')
yield call_and_check_rc('ip', 'link', 'set', wlan, 'up')
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'freq', str(settings.common.wifi_channel), ht_mode)
if settings.common.wifi_txpower:
yield call_and_check_rc('iw', 'dev', wlan, 'set', 'txpower', 'fixed', str(settings.common.wifi_txpower))
except ExecError as v:
if v.stdout:
log.msg(v.stdout, isError=1)
if v.stderr:
log.msg(v.stderr, isError=1)
raise
def run_ui():
return reactor.callInThread(ui_server.start)
def init(profile, wlans):
def _init_services(_):
link_id = int.from_bytes(hashlib.sha1(settings.common.link_id.encode('utf-8')).digest()[:3], 'big')
return defer.gatherResults([defer.maybeDeferred(init_mavlink, profile, wlans, link_id),
defer.maybeDeferred(init_video, profile, wlans, link_id),
defer.maybeDeferred(init_tunnel, profile, wlans, link_id),
defer.maybeDeferred(run_ui)]) \
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure)
return defer.maybeDeferred(init_wlans, profile, wlans).addCallback(_init_services)
def init_mavlink(profile, wlans, link_id):
cfg = getattr(settings, '%s_mavlink' % (profile,))
cmd_rx = ('%s -p %d -u %d -K %s -k %d -n %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n,
link_id)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'),
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id)).split() + wlans
listen = None
connect = None
serial = None
mirror = None
if connect_re.match(cfg.peer):
m = connect_re.match(cfg.peer)
connect = m.group('addr'), int(m.group('port'))
log.msg('Connect telem stream %d(RX), %d(TX) to %s:%d' % (cfg.stream_rx, cfg.stream_tx, connect[0], connect[1]))
elif listen_re.match(cfg.peer):
m = listen_re.match(cfg.peer)
listen = m.group('addr'), int(m.group('port'))
log.msg(
'Listen for telem stream %d(RX), %d(TX) on %s:%d' % (cfg.stream_rx, cfg.stream_tx, listen[0], listen[1]))
elif serial_re.match(cfg.peer):
m = serial_re.match(cfg.peer)
serial = m.group('dev'), int(m.group('baud'))
log.msg('Open serial port %s on speed %d' % (serial[0], serial[1]))
else:
raise Exception('Unsupport peer address: %s' % (cfg.peer,))
if cfg.mirror is not None and connect_re.match(cfg.mirror):
m = connect_re.match(cfg.mirror)
mirror = m.group('addr'), int(m.group('port'))
log.msg('Mirror telem stream to %s:%d' % (mirror[0], mirror[1]))
if cfg.call_on_arm or cfg.call_on_disarm:
arm_proto = ARMProtocol(cfg.call_on_arm, cfg.call_on_disarm)
else:
arm_proto = None
lora_status_manager = None
if serial:
# Drone <--> Flight Controller
p_in = SerialProxyProtocol(agg_max_size=settings.common.radio_mtu,
agg_timeout=settings.common.mavlink_agg_timeout,
inject_rssi=True,
arm_proto=arm_proto,
mavlink_sys_id=cfg.mavlink_sys_id,
mavlink_comp_id=cfg.mavlink_comp_id)
else:
# The first argument is not None only if we initiate mavlink connection
# GS <--> Mission Planner
lora_status_manager = LoraStatusOnGSManager()
p_in = UDPProxyProtocol(connect, agg_max_size=settings.common.radio_mtu,
agg_timeout=settings.common.mavlink_agg_timeout,
inject_rssi=cfg.inject_rssi,
# only controlling msgs should be mirrored
mirror=('127.0.0.1', settings.lora_tx.lora_tx_internal_port),
mirror_all=False, # only controlling msgs (lora) will be mirrored
arm_proto=arm_proto,
mavlink_sys_id=cfg.mavlink_sys_id,
mavlink_comp_id=cfg.mavlink_comp_id,
split_before_write=True,
lora_status_manager=lora_status_manager)
sockets = []
lora_serial_tx_port_1 = lora_serial_tx_port_2 = lora_serial_rx_port_1 = lora_serial_rx_port_2 = None
# [GS] with Lora
if (settings.lora_tx_1.lora_tx_serial is not None and serial_re.match(settings.lora_tx_1.lora_tx_serial) or
settings.lora_tx_2.lora_tx_serial is not None and serial_re.match(settings.lora_tx_2.lora_tx_serial)):
lora_tx_listener, lora_serial_tx_port_1, lora_serial_tx_port_2 = \
init_lora_tx(
arm_proto,
serial_re.match(settings.lora_tx_1.lora_tx_serial or ''),
serial_re.match(settings.lora_tx_2.lora_tx_serial or ''))
sockets += [reactor.listenUDP(settings.lora_tx.lora_tx_internal_port, lora_tx_listener)]
p_tx_l = [UDPProxyProtocol(
addr=('127.0.0.1', cfg.port_tx + i),
split_before_write=True,
arm_proto=arm_proto,
# everything should be mirrored
mirror=('127.0.0.1', settings.lora_tx.lora_tx_internal_port),
mirror_all=True,
noisy=False
) for i, _ in enumerate(wlans)]
# [Drone] with Lora
elif (settings.lora_rx_1.lora_rx_serial is not None and serial_re.match(settings.lora_rx_1.lora_rx_serial) or
settings.lora_rx_2.lora_rx_serial is not None and serial_re.match(settings.lora_rx_2.lora_rx_serial)):
mavlink_filter = MavlinkFilter()
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i),
mavlink_sys_id=cfg.mavlink_sys_id,
mavlink_comp_id=cfg.mavlink_comp_id,
inject_rssi=True) for i, _ in enumerate(wlans)]
def _start_with_lora(lora_number):
lora_settings = getattr(settings, f'lora_rx_{lora_number}')
if settings.sdr.sdr_enabled:
log.msg('[SDR] testing START <<<<')
sdr_manager = SDRManager(sdr_path=settings.sdr.sdr_path,
channels=json.loads(lora_settings.channels),
bandwidth=settings.sdr.bandwidth,
sample_rate=settings.sdr.sample_rate)
sdr_manager.get_new_channel(json.loads(lora_settings.channels)[0])
log.msg('[SDR] testing END >>>>')
lora_serial_rx_proxy, lora_serial_rx_port = \
init_lora_rx(1,
p_tx_l[0],
arm_proto,
serial_re.match(lora_settings.lora_rx_serial),
mavlink_filter=mavlink_filter)
lora_serial_rx_proxy.peer = p_in
return lora_serial_rx_proxy, lora_serial_rx_port
if (settings.lora_rx_1.lora_rx_serial is not None and
serial_re.match(settings.lora_rx_1.lora_rx_serial)):
lora_serial_rx_proxy_1, lora_serial_rx_port_1 = _start_with_lora(1)
if (settings.lora_rx_2.lora_rx_serial is not None and
serial_re.match(settings.lora_rx_2.lora_rx_serial)):
lora_serial_rx_proxy_2, lora_serial_rx_port_2 = _start_with_lora(2)
# without Lora
else:
log.msg('No Lora endpoints defined, "only wfb" mode ON')
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i)) for i, _ in enumerate(wlans)]
p_rx = UDPProxyProtocol(arm_proto=arm_proto, noisy=cfg.noisy_rx)
p_rx.peer = p_in
if serial:
serial_port = SerialPort(p_in, os.path.join('/dev', serial[0]), reactor, baudrate=serial[1])
serial_port._serial.exclusive = True
else:
serial_port = None
sockets += [reactor.listenUDP(listen[1] if listen else 0, p_in)]
sockets += [reactor.listenUDP(cfg.port_rx, p_rx)]
sockets += [reactor.listenUDP(0, p_tx) for p_tx in p_tx_l]
log.msg('Telem RX: %s' % (' '.join(cmd_rx),))
log.msg('Telem TX: %s' % (' '.join(cmd_tx),))
ant_f = AntennaFactory(p_in, p_tx_l, lora_status=lora_status_manager)
if cfg.stats_port:
reactor.listenTCP(cfg.stats_port, ant_f)
# [Drone] disable normal wfb protocol to test only Lora msgs
if settings.lora_rx.disable_wifi_rx:
dl = [TXProtocol(cmd_tx, 'telem tx').start()]
log.msg("!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
log.msg("Normal wifi communication from GS is disabled, only Lora packets will be handled.")
else:
dl = [RXProtocol(ant_f, cmd_rx, 'telem rx').start(), TXProtocol(cmd_tx, 'telem tx').start()]
def _cleanup(x):
if serial_port is not None:
serial_port.loseConnection()
serial_port.connectionLost(failure.Failure(ti_main.CONNECTION_DONE))
if lora_serial_tx_port_1 is not None:
lora_serial_tx_port_1.loseConnection()
lora_serial_tx_port_1.connectionLost(failure.Failure(ti_main.CONNECTION_DONE))
if lora_serial_tx_port_2 is not None:
lora_serial_tx_port_2.loseConnection()
lora_serial_tx_port_2.connectionLost(failure.Failure(ti_main.CONNECTION_DONE))
if lora_serial_rx_port_1 is not None:
lora_serial_rx_port_1.loseConnection()
lora_serial_rx_port_1.connectionLost(failure.Failure(ti_main.CONNECTION_DONE))
if lora_serial_rx_port_2 is not None:
lora_serial_rx_port_2.loseConnection()
lora_serial_rx_port_2.connectionLost(failure.Failure(ti_main.CONNECTION_DONE))
for s in sockets:
s.stopListening()
return x
return defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup) \
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure)
def init_video(profile, wlans, link_id):
cfg = getattr(settings, '%s_video' % (profile,))
if listen_re.match(cfg.peer):
m = listen_re.match(cfg.peer)
listen = m.group('addr'), int(m.group('port'))
log.msg('Listen for video stream %d on %s:%d' % (cfg.stream, listen[0], listen[1]))
# We don't use TX diversity for video streaming due to only one transmitter on the vehichle
cmd = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d %s' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'), cfg.stream,
listen[1], os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id, wlans[0])).split()
df = TXProtocol(cmd, 'video tx').start()
elif connect_re.match(cfg.peer):
m = connect_re.match(cfg.peer)
connect = m.group('addr'), int(m.group('port'))
log.msg('Send video stream %d to %s:%d' % (cfg.stream, connect[0], connect[1]))
ant_f = AntennaFactory(None, None)
if cfg.stats_port:
reactor.listenTCP(cfg.stats_port, ant_f)
cmd = ('%s -p %d -c %s -u %d -K %s -k %d -n %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_rx'),
cfg.stream, connect[0], connect[1],
os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.fec_k, cfg.fec_n, link_id)).split() + wlans
df = RXProtocol(ant_f, cmd, 'video rx').start()
else:
raise Exception('Unsupport peer address: %s' % (cfg.peer,))
log.msg('Video: %s' % (' '.join(cmd),))
return df
def init_tunnel(profile, wlans, link_id):
cfg = getattr(settings, '%s_tunnel' % (profile,))
cmd_rx = ('%s -p %d -u %d -K %s -k %d -n %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_rx'), cfg.stream_rx,
cfg.port_rx, os.path.join(settings.path.conf_dir, cfg.keypair), cfg.fec_k, cfg.fec_n,
link_id)).split() + wlans
cmd_tx = ('%s -p %d -u %d -K %s -B %d -G %s -S %d -L %d -M %d -k %d -n %d -T %d -i %d' % \
(os.path.join(settings.path.bin_dir, 'wfb_tx'),
cfg.stream_tx, cfg.port_tx, os.path.join(settings.path.conf_dir, cfg.keypair),
cfg.bandwidth, "short" if cfg.short_gi else "long", cfg.stbc, cfg.ldpc, cfg.mcs_index,
cfg.fec_k, cfg.fec_n, cfg.fec_timeout, link_id)).split() + wlans
p_in = TUNTAPProtocol()
p_tx_l = [UDPProxyProtocol(('127.0.0.1', cfg.port_tx + i)) for i, _ in enumerate(wlans)]
p_rx = UDPProxyProtocol()
p_rx.peer = p_in
tun_ep = TUNTAPTransport(reactor, p_in, cfg.ifname, cfg.ifaddr, mtu=settings.common.radio_mtu)
sockets = [reactor.listenUDP(cfg.port_rx, p_rx)]
sockets += [reactor.listenUDP(0, p_tx) for p_tx in p_tx_l]
log.msg('Tunnel RX: %s' % (' '.join(cmd_rx),))
log.msg('Tunnel TX: %s' % (' '.join(cmd_tx),))
ant_f = AntennaFactory(p_in, p_tx_l)
if cfg.stats_port:
reactor.listenTCP(cfg.stats_port, ant_f)
dl = [RXProtocol(ant_f, cmd_rx, 'tunnel rx').start(),
TXProtocol(cmd_tx, 'tunnel tx').start()]
def _cleanup(x):
tun_ep.loseConnection()
for s in sockets:
s.stopListening()
return x
return defer.gatherResults(dl, consumeErrors=True).addBoth(_cleanup) \
.addErrback(lambda f: f.trap(defer.FirstError) and f.value.subFailure)
def main():
log.msg('WFB version %s-%s' % (settings.common.version, settings.common.commit[:8]))
profile, wlans = sys.argv[1], list(wlan for arg in sys.argv[2:] for wlan in arg.split())
uname = os.uname()
log.msg('Run on %s/%s @%s, profile %s using %s' % (uname[4], uname[2], uname[1], profile, ', '.join(wlans)))
log.msg('Using cfg files:\n%s' % ('\n'.join(cfg_files),))
reactor.callWhenRunning(lambda: defer.maybeDeferred(init, profile, wlans)
.addErrback(abort_on_crash))
reactor.run()
rc = exit_status()
log.msg('Exiting with code %d' % rc)
sys.exit(rc)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment