Skip to content

Instantly share code, notes, and snippets.

@VedantParanjape
Created March 20, 2020 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save VedantParanjape/49746d0b34f3b6e807facd73185b2aa1 to your computer and use it in GitHub Desktop.
Save VedantParanjape/49746d0b34f3b6e807facd73185b2aa1 to your computer and use it in GitHub Desktop.
import traceback
import numpy as np
import pyModeS as pms
from rtlsdr import RtlSdr
import time
sampling_rate = 2e6
smaples_per_microsec = 2
modes_frequency = 1090e6
buffer_size = 1024 * 200
read_size = 1024 * 100
pbits = 8
fbits = 112
preamble = [1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0]
th_amp_diff = 0.8 # signal amplitude threshold difference between 0 and 1 bit
class adsb_decoder(object):
def __init__(self, **kwargs):
super(adsb_decoder, self).__init__()
self.signal_buffer = [] # amplitude of the sample only
self.debug = kwargs.get("debug", False)
self.raw_pipe_in = None
self.stop_flag = False
self.noise_floor = 1e6
def _calc_noise(self):
"""Calculate noise floor"""
window = smaples_per_microsec * 100
total_len = len(self.signal_buffer)
means = (
np.array(self.signal_buffer[: total_len // window * window])
.reshape(-1, window)
.mean(axis=1)
)
return min(means)
def _process_buffer(self):
"""process raw IQ data in the buffer"""
# update noise floor
self.noise_floor = min(self._calc_noise(), self.noise_floor)
# set minimum signal amplitude
min_sig_amp = 3.162 * self.noise_floor # 10 dB SNR
# Mode S messages
messages = []
buffer_length = len(self.signal_buffer)
i = 0
while i < buffer_length:
if self.signal_buffer[i] < min_sig_amp:
i += 1
continue
if self._check_preamble(self.signal_buffer[i : i + pbits * 2]):
frame_start = i + pbits * 2
frame_end = i + pbits * 2 + (fbits + 1) * 2
frame_length = (fbits + 1) * 2
frame_pulses = self.signal_buffer[frame_start:frame_end]
threshold = max(frame_pulses) * 0.2
msgbin = []
for j in range(0, frame_length, 2):
p2 = frame_pulses[j : j + 2]
if len(p2) < 2:
break
if p2[0] < threshold and p2[1] < threshold:
break
elif p2[0] >= p2[1]:
c = 1
elif p2[0] < p2[1]:
c = 0
else:
msgbin = []
break
msgbin.append(c)
# advance i with a jump
i = frame_start + j
if len(msgbin) > 0:
msghex = pms.bin2hex("".join([str(i) for i in msgbin]))
if self._check_msg(msghex):
messages.append([msghex, time.time()])
if self.debug:
self._debug_msg(msghex)
# elif i > buffer_length - 500:
# # save some for next process
# break
else:
i += 1
# reset the buffer
self.signal_buffer = self.signal_buffer[i:]
return messages
def _check_preamble(self, pulses):
if len(pulses) != 16:
return False
for i in range(16):
if abs(pulses[i] - preamble[i]) > th_amp_diff:
return False
return True
def _check_msg(self, msg):
df = pms.df(msg)
msglen = len(msg)
if df == 17 and msglen == 28:
if pms.crc(msg) == 0:
return True
elif df in [20, 21] and msglen == 28:
return True
elif df in [4, 5, 11] and msglen == 14:
return True
def _debug_msg(self, msg):
df = pms.df(msg)
msglen = len(msg)
if df == 17 and msglen == 28:
print(msg, pms.icao(msg), pms.crc(msg))
elif df in [20, 21] and msglen == 28:
print(msg, pms.icao(msg))
elif df in [4, 5, 11] and msglen == 14:
print(msg, pms.icao(msg))
else:
# print("[*]", msg)
pass
def _read_callback(self, data, rtlsdr_obj):
amp = np.absolute(data)
self.signal_buffer.extend(amp.tolist())
if len(self.signal_buffer) >= buffer_size:
messages = self._process_buffer()
self.handle_messages(messages)
def handle_messages(self, messages):
"""re-implement this method to handle the messages"""
for msg, t in messages:
# print("%15.9f %s" % (t, msg))
pass
def run(self, data, raw_pipe_in=None, stop_flag=None, exception_queue=None):
self.raw_pipe_in = raw_pipe_in
self.stop_flag = stop_flag
try:
# raise RuntimeError("test exception")
self._read_callback(data, None)
except Exception as e:
tb = traceback.format_exc()
if exception_queue is not None:
exception_queue.put(tb)
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment