Skip to content

Instantly share code, notes, and snippets.

@mobilinkd
Created September 3, 2021 21:59
Show Gist options
  • Save mobilinkd/a3cc9577d4ad5490a4d29c8a168a10c1 to your computer and use it in GitHub Desktop.
Save mobilinkd/a3cc9577d4ad5490a4d29c8a168a10c1 to your computer and use it in GitHub Desktop.
Viterbi Decoder for M17 Link Setup Frame
#!/usr/bin/env python3
import sys
import numpy as np
from commpy.channelcoding import Trellis, conv_encode, viterbi_decode
import binascii
import struct
import io
"""
This uses the debug output of the MMDVMHost M17_Trace branch.
NOTE: this requires that the commpy package be installed for
Viterbi decoding.
https://commpy.readthedocs.io/en/latest/
Pass in the 48 bytes of the deinterleaved LSF data on the command-line
by copying the data from the 3 consecutive log lines containing the
"55 F7" LSF sync word.
D: 2021-09-03 21:39:29.492 RX Deinterleaved M17 Frame
D: 2021-09-03 21:39:29.492 0000: 55 F7 DE 49 24 92 49 24 B6 DB 6D 4C 00 61 CF 8E *U..I$.I$..mL.a..*
D: 2021-09-03 21:39:29.492 0010: C5 C9 C4 C4 46 D6 ED 6E 00 00 00 00 00 00 00 00 *....F..n........*
D: 2021-09-03 21:39:29.493 0020: 00 00 00 00 00 00 00 00 00 00 00 00 29 DA 40 F4 *............).@.*
D: 2021-09-03 21:39:29.493 RX LSF without FEC
...
Run the command:
$ ./viteri_decode.py 55 F7 DE 49 24 92 49 24 B6 DB 6D 4C 00 61 CF 8E C5 C9 C4 C4 46 \
D6 ED 6E 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 29 DA 40 F4
You should see output which looks like this:
b'ffffffffffff001b648adad705050000000000000000000000000000946a'
Checksum OK
Decoded callsign: WX9O-11
"""
class CRC16(object):
def __init__(self, poly, init):
self.poly = poly
self.init = init
self.mask = 0xFFFF
self.msb = 0x8000
self.reset()
def reset(self):
self.reg = self.init
for i in range(16):
bit = self.reg & 0x01
if bit:
self.reg ^= self.poly
self.reg >>= 1
if bit:
self.reg |= self.msb
self.reg &= self.mask
def crc(self, data):
for byte in data:
for i in range(8):
msb = self.reg & self.msb
self.reg = ((self.reg << 1) & self.mask) | ((byte >> (7 - i)) & 0x01)
if msb:
self.reg ^= self.poly
def get(self):
reg = self.reg
for i in range(16):
msb = reg & self.msb
reg = ((reg << 1) & self.mask)
if msb:
reg ^= self.poly
return reg & self.mask
def get_bytes(self):
crc = self.get()
return bytearray([(crc>>8) & 0xFF, crc & 0xFF])
crc = CRC16(0x5935, 0xFFFF)
def decode_callsign_base40(encoded_bytes):
# Convert byte array to integer value.
i,h = struct.unpack(">IH", encoded_bytes)
encoded = (i << 16) | h
# Unpack each base-40 digit and map them to the appriate character.
result = io.StringIO()
while encoded:
result.write("xABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-/."[encoded % 40])
encoded //= 40;
return result.getvalue();
def depuncture(punctured: np.ndarray, punct_vec: np.ndarray, shouldbe: int) -> np.ndarray:
"""
Applying of the inserting zeros procedure.
Parameters
----------
punctured : 1D ndarray
Input punctured message {0,1} bit array.
punct_vec : 1D ndarray
Puncturing vector {0,1} bit array.
shouldbe : int
Length of the initial message (before puncturing).
Returns
-------
depunctured : 1D ndarray
Output vector {0,1} bit array.
"""
shift = 0
shift2 = 0
N = len(punct_vec)
depunctured = np.zeros((shouldbe,), int)
for idx, item in enumerate(depunctured):
if punct_vec[idx % N] == 1:
depunctured[idx] = punctured[idx - shift2]
else:
shift2 = shift2 + 1
return depunctured
def decode_link_setup(frame):
# Puncture matrix for link setup frame.
P1 = [1] + [1,0,1,1] * 15
# Define trellis
memory = np.array([4])
trellis = Trellis(memory, np.array([[0o31,0o27]]))
f = np.array(frame) * 2 - 1 # Convert input to +1/-1 for soft decoding.
depunctured = depuncture(f, P1, 488)
return viterbi_decode(depunctured, trellis, decoding_type='soft', tb_depth=25)
def to_bit_array(byte_array):
"""Convert byte array to big-endian bit array."""
return np.concatenate(
[[int((x & (1<<i)) != 0) for i in range(7,-1,-1)] for x in byte_array])
def bits_to_bytes(bits):
x = 0
for i in bits:
x <<= 1
x |= i
return x
def bitarray_to_bytes(bits):
return bytearray([bits_to_bytes(x) for x in np.split(bits, len(bits)//8)])
if len(sys.argv) == 47:
debug_data = ''.join(sys.argv[1:])
elif len(sys.argv) == 49:
debug_data = ''.join(sys.argv[3:])
else:
debug_data = "DE 49 24 92 49 24 B6 DB 6D 4C 00 61 CF 8E C5 C9 C4 C4 46 D6 ED 6E 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 29 DA 40 F4".replace(' ', '')
frame = to_bit_array(binascii.unhexlify(debug_data))
decoded = decode_link_setup(frame)[:-4] # Drop flush bits
decoded_bytes = bitarray_to_bytes(decoded)
print(binascii.hexlify(decoded_bytes))
crc.crc(decoded_bytes)
if crc.get() == 0:
print("Checksum OK")
else:
print("Checksum BAD")
print(f'Decoded callsign: {decode_callsign_base40(decoded_bytes[6:12]):}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment