Skip to content

Instantly share code, notes, and snippets.

@michalfratczak
Created September 1, 2020 23:03
Show Gist options
  • Save michalfratczak/4ff0785b5f7c5da8a4273552097a1cf6 to your computer and use it in GitHub Desktop.
Save michalfratczak/4ff0785b5f7c5da8a4273552097a1cf6 to your computer and use it in GitHub Desktop.
fldigi loger
#!/bin/env python
'''
This script reads characters from fldigi and scans for UKHAS sentences https://ukhas.org.uk/communication:protocol
For every valid sentence, log to file. Works only for sentences with CRC16 checksum, XOR is not supported.
Run fldigi with these parameters
dl-fldigi.exe --hab --arq-server-address 127.0.0.1 --arq-server-port 7777
or
fldigi.exe --arq-server-address 127.0.0.1 --arq-server-port 7777
'''
from __future__ import print_function
import string
import socket
import select
import time
import re
import traceback
import threading
import queue
from datetime import datetime
def crc(i_str):
def _hex(n): return hex(int(n))[2:]
i_str = str(i_str)
CRC = 0xffff
for i in xrange(len(i_str)):
CRC ^= ord(i_str[i]) << 8
for j in xrange(8):
if (CRC & 0x8000):
CRC = (CRC << 1) ^ 0x1021
else:
CRC <<= 1
result = ''
result += _hex((CRC >> 12) & 15)
result += _hex((CRC >> 8) & 15)
result += _hex((CRC >> 4) & 15)
result += _hex(CRC & 15)
return result.lower()
def make_connection(host='localhost', port='7777'):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
port = int(port)
sock.connect(( host, port ))
sock.setblocking(0)
except:
print("fldigi2habboy: make_connection failed.")
print(traceback.format_exc())
return sock
def read_from_fldigi(sock, callback = None):
timeout_in_seconds = 10
while 1:
try:
ready = select.select([sock], [], [], timeout_in_seconds)
if ready[0]:
chunk = sock.recv(4096)
if callback:
callback(chunk)
else:
print("Waiting for some fldigi data ...")
except KeyboardInterrupt:
return
except:
print(traceback.format_exc())
class UkhasSentenceScanner(object):
'''
1. push() some characters into internal buffer
2. if a valid sentence is found, run callback(sentence)
'''
def __init__(self):
self.__buff__ = ''
self.__rex__ = re.compile( r'''(.*?)\$+(.+)(\*)(\w{4})(.*)''' )
self.__sentence_callback__ = print
def set_callback(self, cb):
self.__sentence_callback__ = cb
def push(self, chars):
# print(chars, end='')
if len(self.__buff__) > 5000:
self.__buff__ = ''
self.__buff__ += chars
sentences = self.__scan__()
for s in sentences:
self.__sentence_callback__(s)
def __scan__(self):
sentences = []
for line in string.split(self.__buff__, '\n'):
mo = self.__rex__.match( string.strip(line) )
if not mo:
print('no MO', string.strip(line))
self.__buff__ = line
continue
if len(mo.groups()) != 5:
self.__buff__ = line #sentence might be not complete yet
continue
self.__buff__ = ''
sentence_no_crc = mo.group(2)
_crc = mo.group(4)
if crc(sentence_no_crc).lower() == _crc.lower():
print('CRC ok:', sentence_no_crc, _crc, crc(sentence_no_crc) )
sentences.append( (sentence_no_crc, _crc) )
else:
print('CRC not valid:', sentence_no_crc, _crc, crc(sentence_no_crc) )
sentences.append( (sentence_no_crc, _crc) )
return sentences
def log_to_file(line, filename = None):
if not filename:
filename = './' + datetime.now().strftime('%Y_%m_%d_fldigi.txt')
now = datetime.now().isoformat().replace('T', ' ')
with open(filename, 'a') as fh:
fh.write('\n' + now + ' => ' + line)
def main():
print("Scanner")
scanner = UkhasSentenceScanner()
scanner.set_callback( lambda sent_and_crc:
log_to_file( sent_and_crc[0] + '*' + sent_and_crc[1] ) )
print("fldigi connection")
con = make_connection()
print("Connected to fldigi")
print("Go !")
read_from_fldigi( con, lambda chars: scanner.push(chars) )
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment