Created
September 3, 2021 21:59
-
-
Save mobilinkd/a3cc9577d4ad5490a4d29c8a168a10c1 to your computer and use it in GitHub Desktop.
Viterbi Decoder for M17 Link Setup Frame
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import numpy as np | |
from commpy.channelcoding import Trellis, conv_encode, viterbi_decode | |
import binascii | |
import struct | |
import io | |
""" | |
This uses the debug output of the MMDVMHost M17_Trace branch. | |
NOTE: this requires that the commpy package be installed for | |
Viterbi decoding. | |
https://commpy.readthedocs.io/en/latest/ | |
Pass in the 48 bytes of the deinterleaved LSF data on the command-line | |
by copying the data from the 3 consecutive log lines containing the | |
"55 F7" LSF sync word. | |
D: 2021-09-03 21:39:29.492 RX Deinterleaved M17 Frame | |
D: 2021-09-03 21:39:29.492 0000: 55 F7 DE 49 24 92 49 24 B6 DB 6D 4C 00 61 CF 8E *U..I$.I$..mL.a..* | |
D: 2021-09-03 21:39:29.492 0010: C5 C9 C4 C4 46 D6 ED 6E 00 00 00 00 00 00 00 00 *....F..n........* | |
D: 2021-09-03 21:39:29.493 0020: 00 00 00 00 00 00 00 00 00 00 00 00 29 DA 40 F4 *............).@.* | |
D: 2021-09-03 21:39:29.493 RX LSF without FEC | |
... | |
Run the command: | |
$ ./viteri_decode.py 55 F7 DE 49 24 92 49 24 B6 DB 6D 4C 00 61 CF 8E C5 C9 C4 C4 46 \ | |
D6 ED 6E 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 29 DA 40 F4 | |
You should see output which looks like this: | |
b'ffffffffffff001b648adad705050000000000000000000000000000946a' | |
Checksum OK | |
Decoded callsign: WX9O-11 | |
""" | |
class CRC16(object): | |
def __init__(self, poly, init): | |
self.poly = poly | |
self.init = init | |
self.mask = 0xFFFF | |
self.msb = 0x8000 | |
self.reset() | |
def reset(self): | |
self.reg = self.init | |
for i in range(16): | |
bit = self.reg & 0x01 | |
if bit: | |
self.reg ^= self.poly | |
self.reg >>= 1 | |
if bit: | |
self.reg |= self.msb | |
self.reg &= self.mask | |
def crc(self, data): | |
for byte in data: | |
for i in range(8): | |
msb = self.reg & self.msb | |
self.reg = ((self.reg << 1) & self.mask) | ((byte >> (7 - i)) & 0x01) | |
if msb: | |
self.reg ^= self.poly | |
def get(self): | |
reg = self.reg | |
for i in range(16): | |
msb = reg & self.msb | |
reg = ((reg << 1) & self.mask) | |
if msb: | |
reg ^= self.poly | |
return reg & self.mask | |
def get_bytes(self): | |
crc = self.get() | |
return bytearray([(crc>>8) & 0xFF, crc & 0xFF]) | |
crc = CRC16(0x5935, 0xFFFF) | |
def decode_callsign_base40(encoded_bytes): | |
# Convert byte array to integer value. | |
i,h = struct.unpack(">IH", encoded_bytes) | |
encoded = (i << 16) | h | |
# Unpack each base-40 digit and map them to the appriate character. | |
result = io.StringIO() | |
while encoded: | |
result.write("xABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/."[encoded % 40]) | |
encoded //= 40; | |
return result.getvalue(); | |
def depuncture(punctured: np.ndarray, punct_vec: np.ndarray, shouldbe: int) -> np.ndarray: | |
""" | |
Applying of the inserting zeros procedure. | |
Parameters | |
---------- | |
punctured : 1D ndarray | |
Input punctured message {0,1} bit array. | |
punct_vec : 1D ndarray | |
Puncturing vector {0,1} bit array. | |
shouldbe : int | |
Length of the initial message (before puncturing). | |
Returns | |
------- | |
depunctured : 1D ndarray | |
Output vector {0,1} bit array. | |
""" | |
shift = 0 | |
shift2 = 0 | |
N = len(punct_vec) | |
depunctured = np.zeros((shouldbe,), int) | |
for idx, item in enumerate(depunctured): | |
if punct_vec[idx % N] == 1: | |
depunctured[idx] = punctured[idx - shift2] | |
else: | |
shift2 = shift2 + 1 | |
return depunctured | |
def decode_link_setup(frame): | |
# Puncture matrix for link setup frame. | |
P1 = [1] + [1,0,1,1] * 15 | |
# Define trellis | |
memory = np.array([4]) | |
trellis = Trellis(memory, np.array([[0o31,0o27]])) | |
f = np.array(frame) * 2 - 1 # Convert input to +1/-1 for soft decoding. | |
depunctured = depuncture(f, P1, 488) | |
return viterbi_decode(depunctured, trellis, decoding_type='soft', tb_depth=25) | |
def to_bit_array(byte_array): | |
"""Convert byte array to big-endian bit array.""" | |
return np.concatenate( | |
[[int((x & (1<<i)) != 0) for i in range(7,-1,-1)] for x in byte_array]) | |
def bits_to_bytes(bits): | |
x = 0 | |
for i in bits: | |
x <<= 1 | |
x |= i | |
return x | |
def bitarray_to_bytes(bits): | |
return bytearray([bits_to_bytes(x) for x in np.split(bits, len(bits)//8)]) | |
if len(sys.argv) == 47: | |
debug_data = ''.join(sys.argv[1:]) | |
elif len(sys.argv) == 49: | |
debug_data = ''.join(sys.argv[3:]) | |
else: | |
debug_data = "DE 49 24 92 49 24 B6 DB 6D 4C 00 61 CF 8E C5 C9 C4 C4 46 D6 ED 6E 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 29 DA 40 F4".replace(' ', '') | |
frame = to_bit_array(binascii.unhexlify(debug_data)) | |
decoded = decode_link_setup(frame)[:-4] # Drop flush bits | |
decoded_bytes = bitarray_to_bytes(decoded) | |
print(binascii.hexlify(decoded_bytes)) | |
crc.crc(decoded_bytes) | |
if crc.get() == 0: | |
print("Checksum OK") | |
else: | |
print("Checksum BAD") | |
print(f'Decoded callsign: {decode_callsign_base40(decoded_bytes[6:12]):}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment