Skip to content

Instantly share code, notes, and snippets.

@hemna
Created February 25, 2021 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hemna/b0449a4e015f5f5c24032998cd3c253c to your computer and use it in GitHub Desktop.
Save hemna/b0449a4e015f5f5c24032998cd3c253c to your computer and use it in GitHub Desktop.
import aprslib
import asyncio
import signal
import logging
from aioax25 import kiss as kiss
from aioax25.aprs import APRSInterface
from aioax25 import interface
from aioax25 import frame as axframe
import re
from aprsd import messaging
import sys
LOG = logging.getLogger(__name__)
ax25int = None
LOG_LEVELS = {
"CRITICAL": logging.CRITICAL,
"ERROR": logging.ERROR,
"WARNING": logging.WARNING,
"INFO": logging.INFO,
"DEBUG": logging.DEBUG,
}
def setup_logging(loglevel):
log_level = LOG_LEVELS[loglevel]
LOG.setLevel(log_level)
log_format = "[%(asctime)s][%(threadName)-12s][%(levelname)-5.5s] [%(pathname)s.%(funcName)s:%(lineno)d]" " %(message)s"
date_format = "%m/%d/%Y %I:%M:%S %p"
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format)
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(log_formatter)
LOG.addHandler(sh)
#kiss_logger = logging.getLogger("kiss.classes")
#kiss_logger.setLevel(log_level)
#kiss_logger.addHandler(sh)
ax_logger = logging.getLogger("aioax25.frame")
ax_logger.setLevel(log_level)
ax_logger.addHandler(sh)
def _filter_for_send(message):
"""Filter and format message string for FCC."""
# max? ftm400 displays 64, raw msg shows 74
# and ftm400-send is max 64. setting this to
# 67 displays 64 on the ftm400. (+3 {01 suffix)
# feature req: break long ones into two msgs
message = message[:67]
# We all miss George Carlin
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message)
def _build_msg_payload(msg, id):
"""Build raw string to send over the air."""
return "{}{{{}\n".format(
_filter_for_send(msg),
str(id),
)
def p(interface, frame, match):
global ax25int
LOG.debug("Got an APRS frame")
LOG.debug(repr(frame))
LOG.debug(frame.header)
LOG.debug(frame.payload)
payload = str(frame.payload.decode())
msg = "{}:{}".format(str(frame.header), payload)
aprs_msg = aprslib.parse(msg)
LOG.debug(aprs_msg)
new_msg = "TEST"
id = 11
payload = _build_msg_payload(new_msg, id).encode("UTF-8")
# now build an ax.25 frame
frame = axframe.AX25UnnumberedInformationFrame("APZ100", "WB4BOR-1".encode("UTF-8"), pid=0xf0,
repeaters="WIDE2-1".encode("UTF-8"), payload=payload)
LOG.debug("Send new frame?")
LOG.debug(frame)
ax25int.transmit(frame)
def main():
global ax25int
loop = asyncio.get_event_loop()
setup_logging("DEBUG")
kissdev = kiss.TCPKISSDevice("192.168.1.7", 8001, log=LOG,
loop=loop)
kissdev.open()
kissport0 = kissdev[0]
ax25int = interface.AX25Interface(
kissport=kissport0,
loop=loop,
log=LOG
)
aprsint = APRSInterface(
ax25int=ax25int,
mycall='WB4BOR',
log=LOG
)
aprsint.bind(callback=p, callsign="APN382", regex=True)
def _shutdown():
LOG.debug("shutdown signal")
kissdev.close()
loop.call_later(5, loop.stop)
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT):
loop.add_signal_handler(sig, _shutdown)
LOG.debug("Main loop")
try:
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment