Created
June 18, 2023 02:11
-
-
Save ashafq/717f3992ad9d04789b71fec8fd919f30 to your computer and use it in GitHub Desktop.
KCS Decoder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""A simple KCS decoder | |
Copyright (c) 2023 Ayan Shafat <ayan.x.shafqat@gmail.com> | |
""" | |
from sys import argv, stdout | |
import numpy as np | |
import scipy.io.wavfile as wav | |
ZERO_FREQ = 1200 | |
ONE_FREQ = 2400 | |
def zero_pad(x: np.ndarray, desired_length: int) -> np.ndarray: | |
pad_length = desired_length - len(x) | |
if pad_length > 0: | |
return np.pad(x, (0, pad_length), 'constant') | |
else: | |
return x | |
def decode_symbols(samples: np.ndarray, sample_rate: int) -> bytes: | |
"""Decode KCS stream in time domain""" | |
block_size = 4 * sample_rate // ZERO_FREQ | |
num_blocks = samples.shape[0] // block_size | |
bits = b"" | |
fft_size = 256 | |
zero_bin_idx = ZERO_FREQ * fft_size // sample_rate | |
one_bin_idx = ONE_FREQ * fft_size // sample_rate | |
window = np.hanning(block_size) | |
for frame in np.array_split(samples, num_blocks): | |
frame = zero_pad(frame, block_size) | |
frame = frame * window | |
freq = np.fft.fft(frame, fft_size) | |
zero_magnitude = np.abs(freq[zero_bin_idx]) | |
one_magnitude = np.abs(freq[one_bin_idx]) | |
if zero_magnitude > one_magnitude: | |
bits += b"0" | |
elif one_magnitude > zero_magnitude: | |
bits += b"1" | |
else: | |
bits += b"E" | |
return bits | |
def decode_bits(bits: bytes) -> bytes: | |
"""Decode a bitstream into bytes""" | |
# Remove the initial one bits | |
bits = bits.lstrip(b"1") | |
byte_data = b"" | |
zero = ord(b'0') | |
one = ord(b'1') | |
while len(bits) >= 11: | |
if (bits[0] == zero) and (bits[9] == bits[10] == one): # Start and stop bits are correct | |
byte_value = 0 | |
for i in range(8): | |
bit_value = 0 if bits[i+1] == zero else 1 | |
byte_value |= bit_value << i | |
byte_data += byte_value.to_bytes(1, byteorder='little') | |
bits = bits[11:] | |
else: | |
print(".", end="") | |
bits = bits[1:] | |
return byte_data | |
def main(arg): | |
"""Main Magic""" | |
if len(arg) < 1: | |
print("Usage: program.py [wave-file]") | |
return | |
audio_file = arg[0] | |
data_file = stdout if len(arg) < 2 else arg[1] | |
# Load the audio file | |
sample_rate, samples = wav.read(audio_file) | |
# Normalize the samples to [-1.0, 1.0] | |
samples = samples / np.max(np.abs(samples)) | |
# Demodulate the KCS signal | |
symbols = decode_symbols(samples, sample_rate) | |
data = decode_bits(symbols) | |
if data_file == stdout: | |
stdout.buffer.write(data) | |
else: | |
with open(data_file, 'wb') as dat: | |
dat.write(data) | |
if __name__ == "__main__": | |
main(argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment