Skip to content

Instantly share code, notes, and snippets.

@kk7ds
Last active January 1, 2022 17:06
Show Gist options
  • Save kk7ds/44560412f8887eaf39fd5f4ff125e3ab to your computer and use it in GitHub Desktop.
Save kk7ds/44560412f8887eaf39fd5f4ff125e3ab to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# Copyright 2022 Dan Smith <dsmith+kpc3@danplanet.com>
#
# This monitors an aprx rflog and compares packets heard on multiple
# interfaces to make sure they arrive reasonably close. It is intended
# to monitor a troublesome kpc3+ to make sure the packets received via
# it and APRSIS are close together. When using this, enable a wide
# range filter on your aprsis connection so that you receive things
# back from APRSIS you are likely to hear on RF.
#
# Run this like:
#
# tail /tmp/rflog | python3 kpc3mon.py
#
# For more information, see
# http://www.danplanet.com/blog/2021/12/31/fixing-kpc3-kiss-issues/
import collections
import datetime
import sys
import time
import colorama as C
def cprint(color, string):
print(color + string + C.Style.RESET_ALL)
class Packet:
def __init__(self, line):
date, time, self.iface, self.action, packet = line.split(None, 4)
self.stamp = datetime.datetime.strptime('%s %s' % (date, time[:8]),
'%Y-%m-%d %H:%M:%S')
self.stamp = self.stamp.replace(microsecond=int(time[9:]) * 1000)
self.src, rest = packet.split('>', 1)
path, self.payload = rest.split(':', 1)
try:
self.dst, self.path = path.split(',', 1)
except ValueError:
self.dst = path
self.path = ''
self.fpath = ','.join(x for x in self.path.split(',')
if not x.startswith('WIDE'))
def __hash__(self):
return hash('%s/%s/%s' % (self.src, self.dst, self.payload))
def __eq__(self, other):
return hash(self) == hash(other)
def __str__(self):
return '%s>%s via %s: %r' % (self.src, self.dst, self.iface,
self.payload)
def __repr__(self):
return str(self)
def clean(interfaces):
now = datetime.datetime.utcnow()
limit = datetime.timedelta(minutes=5)
stats = collections.defaultdict(int)
for interface, packets in interfaces.items():
for packet in list(packets):
if now - packet.stamp > limit:
packets.remove(packet)
stats[interface] += 1
print('Expired unaccounted packets: %s' % (
' '.join('%s=%s' % (k, v)
for k, v in stats.items())))
def report(new_packet, old_packet):
delta = new_packet.stamp - old_packet.stamp
timer = 5
old_if = old_packet.iface
new_if = new_packet.iface
if old_if != 'APRSIS':
#old_if = '%s%s' % (old_if, '*' * old_packet.fpath.count('*'))
timer = 10
if new_if != 'APRSIS':
#new_if = '%s%s' % (new_if, '*' * new_packet.fpath.count('*'))
timer = 10
# Direct packets are held to a higher standard than
# ones that came through a digi
if delta > datetime.timedelta(seconds=timer):
color = C.Back.RED
else:
color = C.Fore.GREEN
if delta.total_seconds() < 10:
t = '%.1f sec' % delta.total_seconds()
else:
t = '%i sec' % delta.total_seconds()
cprint(color, '%s %s via %s is %s after %s (%s:%s)' % (
new_packet.stamp.strftime('%H:%M:%S'),
new_packet.src, new_if, t,
old_if, new_packet.fpath, new_packet.payload))
def match(interfaces, packet):
now = datetime.datetime.now()
matched = False
for interface, packets in interfaces.items():
if interface == packet.iface:
continue
if packet in packets:
# This is silly, but to get the old packet out of the set,
# we have to iterate and find it. Set math may return the
# "same" packet we're comparing against.
old_packet = [x for x in packets if x == packet][0]
report(packet, old_packet)
packets.remove(packet)
matched = True
return matched
def process_stream():
interfaces = collections.defaultdict(set)
last_clean = time.time()
while True:
line = sys.stdin.readline().strip()
packet = Packet(line)
if not match(interfaces, packet):
interfaces[packet.iface].add(packet)
if time.time() - last_clean > 300:
clean(interfaces)
last_clean = time.time()
process_stream()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment