Created
January 11, 2020 19:18
-
-
Save robots/ae3e7286d408c901836666418597ea86 to your computer and use it in GitHub Desktop.
Testing RS41 radiosonde demodulation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# excercise for FSK demodulator - Michal Demin | |
# wav file reader and rs41 details from https://github.com/rs1729/RS | |
# raised root cosine filter generation from soundmodem project: https://gitlab.com/tsailer/soundmodem | |
# fsk demod inspired by soundmodem, but completly rewritten | |
# demodulator is not the fastest one (fir filter is implemented very badly), but it is working | |
# this was written as excercise to test fskdemodulator on PC before implementing MCU version for stm32(stay tuned) | |
import numpy as np | |
import struct | |
import math | |
import sys | |
import matplotlib.pyplot as plt | |
class vav: | |
def __init__(self, fname): | |
self.fin = open(fname, "rb") | |
self.read_hdr() | |
def read_hdr(self): | |
fin = self.fin | |
d = fin.read(4) | |
if not d == b'RIFF': | |
return False | |
d = fin.read(4) | |
d = fin.read(4) | |
if not d == b'WAVE': | |
return False | |
d = fin.read(4) | |
if not d == b'fmt ': | |
return False | |
d = fin.read(4) | |
d = fin.read(2) | |
d = fin.read(2) | |
self.chs, = struct.unpack("<H", d) | |
d = fin.read(4) | |
self.samprate, = struct.unpack("<i", d) | |
d = fin.read(4) | |
d = fin.read(2) | |
d = fin.read(2) | |
self.bps, = struct.unpack("<H", d) | |
da = b'' | |
while True: | |
da += fin.read(1) | |
if da == b'data': | |
break | |
if len(da) > 4: | |
da = da[1:] | |
fin.read(4) | |
print("Samplerate", self.samprate) | |
print("bits", self.bps) | |
print("chans", self.chs) | |
return True | |
def read_sample(self, ch = 0): | |
fin = self.fin | |
sample = 0 | |
for i in range(0, self.chs): | |
if self.bps == 8: | |
dat = fin.read(1) | |
elif self.bps == 16: | |
dat = fin.read(2) | |
if len(dat) == 0: | |
return False | |
if ch == i: | |
if self.bps == 8: | |
sample, = struct.unpack("<b", dat) | |
elif self.bps == 16: | |
sample, = struct.unpack("<h", dat) | |
return sample | |
class filt: | |
def __init__(self, bps, samplerate, window=0): | |
RCOSALPHA = (3.0/8) | |
filterover = 16 | |
self.firlen = int(int((samplerate * 8 + bps - 1)) / bps); | |
if self.firlen > 64: | |
print("Firlen, large", self.firlen) | |
# self.firlen = 64 | |
print("Firlen", self.firlen) | |
tmul = bps / filterover / samplerate; | |
self.coeff = np.zeros( (filterover, self.firlen), dtype = np.int32 ) | |
ct = np.zeros( (filterover, self.firlen) ) | |
for i in range(0, filterover * self.firlen): | |
t = int(i - self.firlen * filterover / 2) * tmul; | |
idx1 = int(i % filterover) | |
idx2 = int(i / filterover) | |
ct[idx1][idx2] = self.rrc_t(t, RCOSALPHA); | |
#print("Filter coefs", self.coeff) | |
max1 = 0 | |
for i in range(0, filterover): | |
max2 = 0 | |
for j in range(0, self.firlen): | |
max2 += abs(ct[i][j]) | |
if max2 > max1: | |
max1 = max2 | |
max2 = (0x3fffffff / 0x7fff) / max1; | |
for i in range(0, filterover): | |
for j in range(0, self.firlen): | |
self.coeff[i][j] = max2 * ct[i][j]; | |
#f1 += self.coeff[i][j] * self.coeff[i][j]; | |
#pulseen[i] = f1; | |
print("scaled filtercoefs", self.coeff) | |
def rrc_t(self, time, alpha): | |
if abs(alpha) < 1e-8: | |
return self.sinc(time); | |
if abs(time) < 1e-8: | |
return ((4.0 / math.pi) - 1.0) * alpha + 1.0; | |
opap = (1.0 + alpha) * math.pi; | |
omap = (1.0 - alpha) * math.pi; | |
at4 = alpha * time * 4; | |
omat4sq = 1 - at4 * at4; | |
if abs(omat4sq) < 1e-8: | |
return ((4.0 * alpha) * math.cos(opap * time) - at4 * opap * math.sin(opap * time) + omap * math.cos(omap * time)) / (1 - 3 * at4 * at4) * (1.0 / math.pi); | |
return (4.0 / math.pi) * alpha / omat4sq * (math.cos(opap * time) + math.sin(omap * time) / at4); | |
def sinc(self, x): | |
arg = x * math.pi | |
if abs(arg) < 1e-10: | |
return 1; | |
return math.sin(arg) / arg; | |
def filter(self, samples, phase): | |
f = int((phase >> 12) & 0x0f) | |
#print (phase , f) | |
s = 0 | |
for i in range(0, self.firlen): | |
s += self.coeff[f][i] * samples[-i] | |
return int(s / (2**16)) | |
class fsk: | |
def __init__(self, bps, samplerate, debug = False): | |
self.debug = debug | |
self.bitspersample = int(samplerate/bps) | |
self.pllmax = self.bitspersample * 8 | |
self.pllinc = 8 | |
print("pllinc", self.pllinc) | |
self.pll = 0 | |
self.dco = 0 | |
self.dco_a = 0 | |
self.dco_c = 0 | |
self.lastcurs = 0 | |
self.filter = filt(bps, samplerate) | |
self.samples = [0] * self.filter.firlen | |
self.bits = 0 | |
self.outbits = 0 | |
self.outbitc = 0 | |
def proc_sample(self, sample): | |
self.samples.pop(0) | |
self.samples.append(sample) | |
if self.debug: | |
print("pll", self.pll) | |
curs = self.filter.filter(self.samples, 0)#int(self.pll)) | |
# self.dco_a += curs | |
# self.dco_c += 1 | |
# if self.dco_c >= 4096: | |
# self.dco = int(self.dco_a / 4096) | |
# self.dco_a = 0 | |
# self.dco_c = 0 | |
# print("DCO", self.dco) | |
curs -= self.dco | |
rawbit = (int(curs) > 0) & 1 | |
#rawit = (int(curs) >> 15) & 1 | |
self.pll += self.pllinc | |
self.pll = int(self.pll) | |
# zero cross detected, adjust pll | |
if rawbit != self.bits & 1: | |
if self.pll < self.pllmax / 2: | |
self.pll -= 1 | |
if self.debug: | |
print("pll -") | |
else: | |
if self.debug: | |
print("pll +") | |
self.pll += 1 | |
if self.debug: | |
print("pll %04d rawbit %d bits %012x" % (self.pll, rawbit, self.bits)) | |
# save bit | |
self.bits <<= 1 | |
self.bits |= rawbit | |
self.bits &= 0xffffffffffff | |
ob = 0 | |
# evaluate bits | |
if self.pll >= self.pllmax: | |
self.pll %= self.pllmax | |
bitcount = 0 | |
for i in range(0, self.bitspersample): | |
if self.bits & (1 << i): | |
bitcount += 1 | |
if bitcount > self.bitspersample/2: | |
bit = 0 | |
ob = -1 | |
else: | |
bit = 1 | |
ob = 1 | |
# self.outbitc += 1 | |
# self.outbits <<= 1 | |
# self.outbits |= bit | |
# | |
# #print(curs) | |
# if self.outbitc == 8: | |
## print("byte %02x" % self.outbits) | |
# self.outbitc = 0 | |
# self.outbits = 0 | |
# | |
# print("%012x %02d %d" % (self.bits, bitcount, ob)) | |
return curs, self.pll, ob | |
class rs41: | |
mask = [ 0x96, 0x83, 0x3E, 0x51, 0xB1, 0x49, 0x08, 0x98, 0x32, 0x05, 0x59, 0x0E, 0xF9, 0x44, 0xC6, 0x26, 0x21, 0x60, 0xC2, 0xEA, 0x79, 0x5D, 0x6D, 0xA1, 0x54, 0x69, 0x47, 0x0C, 0xDC, 0xE8, 0x5C, 0xF1, 0xF7, 0x76, 0x82, 0x7F, 0x07, 0x99, 0xA2, 0x2C, 0x93, 0x7C, 0x30, 0x63, 0xF5, 0x10, 0x2E, 0x61, 0xD0, 0xBC, 0xB4, 0xB6, 0x06, 0xAA, 0xF4, 0x23, 0x78, 0x6E, 0x3B, 0xAE, 0xBF, 0x7B, 0x4C, 0xC1 ] | |
hdr = [ 0x10, 0xB6, 0xCA, 0x11, 0x22, 0x96, 0x12, 0xF8 ] | |
#hdr64 = 0x10B6CA11229612F8 | |
#nhdr64 = 0x8635f44093df1a60 | |
header = [0,0,0,0,1,0,0,0,0,1,1,0,1,1,0,1,0,1,0,1,0,0,1,1,1,0,0,0,1,0,0,0,0,1,0,0,0,1,0,0,0,1,1,0,1,0,0,1,0,1,0,0,1,0,0,0,0,0,0,1,1,1,1,1] | |
nheader = [1,1,1,1,0,1,1,1,1,0,0,1,0,0,1,0,1,0,1,0,1,1,0,0,0,1,1,1,0,1,1,1,1,0,1,1,1,0,1,1,1,0,0,1,0,1,1,0,1,0,1,1,0,1,1,1,1,1,1,0,0,0,0,0] | |
hdrsearch = [0] * 64 | |
def __init__(self): | |
self.state = 0 | |
self.idx = 0 | |
self.frame = bytearray(b'\x00'*518) | |
self.explen = 518 | |
self.bits = 0 | |
def process_bit(self, bit): | |
if self.state == 0: | |
self.hdrsearch.pop(0) | |
self.hdrsearch.append(bit) | |
if self.hdrsearch == self.header: | |
# got header | |
self.state = 1 | |
self.bitc = 0 | |
self.bits = 0 | |
self.idx = 8 | |
print("got hdr") | |
if self.hdrsearch == self.nheader: | |
self.state = 2 | |
self.bitc = 0 | |
self.bits = 0 | |
self.idx = 8 | |
print("got hdr inv") | |
else: | |
self.bits >>= 1 | |
if bit: | |
self.bits |= 0x80 | |
self.bitc += 1 | |
if self.bitc == 8: | |
if self.state == 2: | |
byte = (0xff - self.bits) ^ self.mask[self.idx % len(self.mask)] | |
else: | |
byte = (self.bits) ^ self.mask[self.idx % len(self.mask)] | |
# print("%d %08x %08x %08x " % (self.idx, self.bits, self.mask[self.idx % len(self.mask)], byte)) | |
self.frame[self.idx] = byte | |
if self.idx == 56: | |
if byte == 0xf0: | |
self.explen = 518 | |
elif byte == 0x0f: | |
self.explen = 320 | |
else: | |
print("unknown frame") | |
self.state = 0 | |
return | |
print("explen", self.explen) | |
self.idx += 1 | |
if self.idx == self.explen: | |
self.process_frame() | |
print("got frame") | |
self.idx = 0 | |
self.state = 0 | |
self.bitc = 0 | |
self.bits = 0 | |
def process_frame(self): | |
print(self.idx, self.frame) | |
pass | |
wav = vav(sys.argv[1]) | |
f = fsk(4800, wav.samprate) | |
rs = rs41() | |
samples = [] | |
curs = [] | |
plls = [] | |
bits = [] | |
while True: | |
s = wav.read_sample() | |
if not s: | |
print("end") | |
break | |
c, pll, b = f.proc_sample(s) | |
if b == -1: | |
rs.process_bit(0) | |
elif b == 1: | |
rs.process_bit(1) | |
samples.append(s) | |
bits.append(b*10000) | |
curs.append(c) | |
plls.append(pll) | |
#plt.ion() | |
#plt.axis('equal') | |
plt.grid() | |
#ig = plt.figure(figsize=(6,9)) | |
#fig.canvas.mpl_connect('close_event', handle_close) | |
plt.plot(range(0, len(samples)), samples, color='b', label="samples", alpha=0.7) | |
plt.plot(range(0, len(samples)), bits, color='k', label="bits") | |
plt.plot(range(0, len(samples)), plls, color='r', label="phase") | |
plt.plot(range(0, len(samples)), curs, color='m', alpha = 0.7) | |
plt.tight_layout() | |
plt.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# simple frame parser for rs41, position only, crc is checked | |
# gps coordinate conversion from https://github.com/rs1729/RS | |
import math | |
import struct | |
def crc16(data: bytes, poly=0x1021): | |
data = bytearray(data) | |
crc = 0xFFFF | |
for b in data: | |
crc = crc ^ (b << 8) | |
for _ in range(0, 8): | |
if (crc & 0x8000): | |
crc = (crc << 1) ^ poly | |
else: | |
crc <<= 1 | |
crc = crc & 0xffff | |
return crc | |
EARTH_a = 6378137.0 | |
EARTH_b = 6356752.31424518 | |
EARTH_a2_b2 = (EARTH_a*EARTH_a - EARTH_b*EARTH_b) | |
a = EARTH_a | |
b = EARTH_b | |
a_b = EARTH_a2_b2 | |
e2 = EARTH_a2_b2 / (EARTH_a*EARTH_a) | |
ee2 = EARTH_a2_b2 / (EARTH_b*EARTH_b) | |
def ecef2elli(x,y,z, vx,vy,vz): | |
lam = math.atan2( y , x ) | |
p = math.sqrt( x*x + y*y ) | |
t = math.atan2( z *a , p*b ) | |
phi = math.atan2( z + ee2 * b * math.sin(t)*math.sin(t)*math.sin(t) , p - e2 * a * math.cos(t)*math.cos(t)*math.cos(t) ) | |
R = a / math.sqrt( 1 - e2 * math.sin(phi) * math.sin(phi) ) | |
alt = p / math.cos(phi) - R | |
lat = phi * 180/ math.pi | |
lon = lam * 180/ math.pi | |
vN = -vx * math.sin(phi)*math.cos(lam) - vy*math.sin(phi)*math.sin(lam) + vz*math.cos(phi) | |
vE = -vx * math.sin(lam) + vy*math.cos(lam); | |
vU = vx * math.cos(phi)*math.cos(lam) + vy*math.cos(phi)*math.sin(lam) + vz*math.sin(phi); | |
vH = math.sqrt(vN * vN + vE * vE); | |
h = math.atan2(vE, vN) * 180 / math.pi; | |
if h < 0: | |
h += 360; | |
return lat, lon, alt, vN, vE, vU, vH, h | |
test = bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00\x07W\x8eGZ\xfb\xba\x05\x0b\x91\xbc\x7f\xe8\x90\x05O\xe8\xd6\x9d\xa7\xc6xw\xc3k\xa1\x8a\xaf\x92\xac\x1a\x01M_X\xa2:"\xceP&\xe1\x19B\xd8i\xc4\xe4\x0fy(\xc1\x1eP4220234\x1a\x00\x00\x03\x00\x00\x13@\x00\x00\x00\x072\x12\x00\x00\x00\x00\x00*\xe9s\xc3_(@>\xbb\x92\t\x0fLz*\x00\x00\x00\xd0\xfb\x01%\xe1\x02\xc6F\x08\xdf=\x07aI\x08f\x04\x02\xcf\xfb\x01,\xe1\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19w|\x1e\xf8\x07\x08\x8e\xe4\x02\x13\xb8\x1c\xba\x1e\x93\x11\xdd\n\x92\x0c\x95\x18\xf6\x0f\xf7\x12\x90\x14\x91\x01\x91\r\xf8]\xf7}YR\xf0<\x01\xffLoH\t\xcc\r\xff\x8a!j\x0bQ\xb5\x00\xeb%\x9e\x18\xe5\xd3\x00\xa7\xb6\xda\x06\xba\xa9\xff\x14C\x0b\x16\xb7\x96\xff\xf4\xd03\x13\xcc\xe2\xfeQ\xcb\xb5\x03#O\xff4\x00\x00\x00\t3\x00\xd6o2\x1a[\x04\x00Y\xb9#\x12\x01%\x00\xa5Y\x13\x1ak\x91\xff-\x0fm\x02y\xa8\x00\xf6-{\x15\xcd\xc5\xf4\x17\xa5\x19&\x081\xdbV\x1c\x9b\xf9\xad\x08\xcb\xfa\x0b\x03\x0e\x15\xbev\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xec\xc7\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') | |
if test[0x38] == 0x0f: | |
framelen = 320 | |
else: | |
framelen = 518 | |
i = 0x39 | |
while True: | |
bloktype, bloklen = struct.unpack("<BB", test[i:i+2]) | |
blokdata = test[i+2:i+2+bloklen] | |
blokcrc, = struct.unpack("<H", test[i+bloklen+2:i+bloklen+4]) | |
i = i+bloklen+4 | |
if i >= framelen: | |
break | |
if not bloktype == 0x76 and not crc16(blokdata) == blokcrc: | |
print("crc nok") | |
print("%d %02x %d " %(i, bloktype, bloklen), blokdata) | |
if bloktype == 0x79: # status | |
print("status", struct.unpack("<H8sHIIBHB16s", blokdata)) | |
elif bloktype == 0x7a: # meas | |
pass | |
elif bloktype == 0x7c: # gpsinfo | |
gpsweek, gpssec, = struct.unpack("<HI", blokdata[:6]) | |
print(gpsweek, gpssec) | |
elif bloktype == 0x7d: # gpsraw | |
pass | |
elif bloktype == 0x7b: # gpspos | |
x,y,z, vx, vy, vz, sats, sa, pdop = struct.unpack("<iiihhhBBB", blokdata) | |
print(x,y,z, vx,vy,vz,sats) | |
print(ecef2elli(x/100,y/100,z/100,vx/100, vy/100, vz/100)) | |
elif bloktype == 0x76: # empty | |
pass | |
else: | |
break | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment