Skip to content

Instantly share code, notes, and snippets.

@hemna
Created October 11, 2023 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hemna/ad07b5299dddcf01b2ea11fd1d491d16 to your computer and use it in GitHub Desktop.
Save hemna/ad07b5299dddcf01b2ea11fd1d491d16 to your computer and use it in GitHub Desktop.
tail direwolf log and convert to aprsd Packet objects.
# Test script to test using asyncio to
# Tail a direwolf logfile looking for RX/TX APRS packets
# and then convert those to APRSD Packet objects.
#
#
# 1) create virtualenv
# python -m venv .venv
# 2) source venv
# source .venv/bin/activate
# 3) install aprsd and async_tail
# pip install aprsd async-tail
# 4) update DIREWOLF_LOG setting to point to your active direwolf.log
# 5) run it
# python test.py
import asyncio
from rich.console import Console
import re
import aprslib
from aprsd.packets import core
from async_tail import atail
DIREWOLF_LOG='/home/pi/direwolf.log'
async def main():
cs = Console()
cs.print("Starting up!")
async for line in atail(DIREWOLF_LOG):
if line[0]:
# Look for a packet RX on RF
search = re.search("^\[\d\.*\d*\] (.*)", line[0])
packetstring = None
if search:
cs.print("Found a X output!")
packetstring = search.group(1)
packetstring = packetstring.replace('<0x0d>','\x0d').replace('<0x1c>','\x1c').replace('<0x1e>','\x1e').replace('<0x1f>','\0x1f').replace('<0x0a>','\0x0a')
pkt_dict = aprslib.parse(packetstring)
packet = core.Packet.factory(pkt_dict)
cs.print(packet)
else:
# Look for a packet being TX on RF
search = re.search("^\[\d[HL]\] (.*)", line[0])
if search:
cs.print("Found a TX output!")
# We found a line we are Transmitting
packetstring = search.group(1)
packetstring = packetstring.replace('<0x0d>','\x0d').replace('<0x1c>','\x1c').replace('<0x1e>','\x1e').replace('<0x1f>','\0x1f').replace('<0x0a>','\0x0a')
pkt_dict = aprslib.parse(packetstring)
packet = core.Packet.factory(pkt_dict)
cs.print(packet)
else:
cs.print(f"Line doesn't match")
continue
#cs.print(line)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment