Created
February 25, 2021 18:20
-
-
Save hemna/b0449a4e015f5f5c24032998cd3c253c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aprslib | |
import asyncio | |
import signal | |
import logging | |
from aioax25 import kiss as kiss | |
from aioax25.aprs import APRSInterface | |
from aioax25 import interface | |
from aioax25 import frame as axframe | |
import re | |
from aprsd import messaging | |
import sys | |
LOG = logging.getLogger(__name__) | |
ax25int = None | |
LOG_LEVELS = { | |
"CRITICAL": logging.CRITICAL, | |
"ERROR": logging.ERROR, | |
"WARNING": logging.WARNING, | |
"INFO": logging.INFO, | |
"DEBUG": logging.DEBUG, | |
} | |
def setup_logging(loglevel): | |
log_level = LOG_LEVELS[loglevel] | |
LOG.setLevel(log_level) | |
log_format = "[%(asctime)s][%(threadName)-12s][%(levelname)-5.5s] [%(pathname)s.%(funcName)s:%(lineno)d]" " %(message)s" | |
date_format = "%m/%d/%Y %I:%M:%S %p" | |
log_formatter = logging.Formatter(fmt=log_format, datefmt=date_format) | |
sh = logging.StreamHandler(sys.stdout) | |
sh.setFormatter(log_formatter) | |
LOG.addHandler(sh) | |
#kiss_logger = logging.getLogger("kiss.classes") | |
#kiss_logger.setLevel(log_level) | |
#kiss_logger.addHandler(sh) | |
ax_logger = logging.getLogger("aioax25.frame") | |
ax_logger.setLevel(log_level) | |
ax_logger.addHandler(sh) | |
def _filter_for_send(message): | |
"""Filter and format message string for FCC.""" | |
# max? ftm400 displays 64, raw msg shows 74 | |
# and ftm400-send is max 64. setting this to | |
# 67 displays 64 on the ftm400. (+3 {01 suffix) | |
# feature req: break long ones into two msgs | |
message = message[:67] | |
# We all miss George Carlin | |
return re.sub("fuck|shit|cunt|piss|cock|bitch", "****", message) | |
def _build_msg_payload(msg, id): | |
"""Build raw string to send over the air.""" | |
return "{}{{{}\n".format( | |
_filter_for_send(msg), | |
str(id), | |
) | |
def p(interface, frame, match): | |
global ax25int | |
LOG.debug("Got an APRS frame") | |
LOG.debug(repr(frame)) | |
LOG.debug(frame.header) | |
LOG.debug(frame.payload) | |
payload = str(frame.payload.decode()) | |
msg = "{}:{}".format(str(frame.header), payload) | |
aprs_msg = aprslib.parse(msg) | |
LOG.debug(aprs_msg) | |
new_msg = "TEST" | |
id = 11 | |
payload = _build_msg_payload(new_msg, id).encode("UTF-8") | |
# now build an ax.25 frame | |
frame = axframe.AX25UnnumberedInformationFrame("APZ100", "WB4BOR-1".encode("UTF-8"), pid=0xf0, | |
repeaters="WIDE2-1".encode("UTF-8"), payload=payload) | |
LOG.debug("Send new frame?") | |
LOG.debug(frame) | |
ax25int.transmit(frame) | |
def main(): | |
global ax25int | |
loop = asyncio.get_event_loop() | |
setup_logging("DEBUG") | |
kissdev = kiss.TCPKISSDevice("192.168.1.7", 8001, log=LOG, | |
loop=loop) | |
kissdev.open() | |
kissport0 = kissdev[0] | |
ax25int = interface.AX25Interface( | |
kissport=kissport0, | |
loop=loop, | |
log=LOG | |
) | |
aprsint = APRSInterface( | |
ax25int=ax25int, | |
mycall='WB4BOR', | |
log=LOG | |
) | |
aprsint.bind(callback=p, callsign="APN382", regex=True) | |
def _shutdown(): | |
LOG.debug("shutdown signal") | |
kissdev.close() | |
loop.call_later(5, loop.stop) | |
for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT): | |
loop.add_signal_handler(sig, _shutdown) | |
LOG.debug("Main loop") | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
pass | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment