Skip to content

Instantly share code, notes, and snippets.

@robots
Created January 11, 2020 19:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robots/ae3e7286d408c901836666418597ea86 to your computer and use it in GitHub Desktop.
Save robots/ae3e7286d408c901836666418597ea86 to your computer and use it in GitHub Desktop.
Testing RS41 radiosonde demodulation
# excercise for FSK demodulator - Michal Demin
# wav file reader and rs41 details from https://github.com/rs1729/RS
# raised root cosine filter generation from soundmodem project: https://gitlab.com/tsailer/soundmodem
# fsk demod inspired by soundmodem, but completly rewritten
# demodulator is not the fastest one (fir filter is implemented very badly), but it is working
# this was written as excercise to test fskdemodulator on PC before implementing MCU version for stm32(stay tuned)
import numpy as np
import struct
import math
import sys
import matplotlib.pyplot as plt
class vav:
def __init__(self, fname):
self.fin = open(fname, "rb")
self.read_hdr()
def read_hdr(self):
fin = self.fin
d = fin.read(4)
if not d == b'RIFF':
return False
d = fin.read(4)
d = fin.read(4)
if not d == b'WAVE':
return False
d = fin.read(4)
if not d == b'fmt ':
return False
d = fin.read(4)
d = fin.read(2)
d = fin.read(2)
self.chs, = struct.unpack("<H", d)
d = fin.read(4)
self.samprate, = struct.unpack("<i", d)
d = fin.read(4)
d = fin.read(2)
d = fin.read(2)
self.bps, = struct.unpack("<H", d)
da = b''
while True:
da += fin.read(1)
if da == b'data':
break
if len(da) > 4:
da = da[1:]
fin.read(4)
print("Samplerate", self.samprate)
print("bits", self.bps)
print("chans", self.chs)
return True
def read_sample(self, ch = 0):
fin = self.fin
sample = 0
for i in range(0, self.chs):
if self.bps == 8:
dat = fin.read(1)
elif self.bps == 16:
dat = fin.read(2)
if len(dat) == 0:
return False
if ch == i:
if self.bps == 8:
sample, = struct.unpack("<b", dat)
elif self.bps == 16:
sample, = struct.unpack("<h", dat)
return sample
class filt:
def __init__(self, bps, samplerate, window=0):
RCOSALPHA = (3.0/8)
filterover = 16
self.firlen = int(int((samplerate * 8 + bps - 1)) / bps);
if self.firlen > 64:
print("Firlen, large", self.firlen)
# self.firlen = 64
print("Firlen", self.firlen)
tmul = bps / filterover / samplerate;
self.coeff = np.zeros( (filterover, self.firlen), dtype = np.int32 )
ct = np.zeros( (filterover, self.firlen) )
for i in range(0, filterover * self.firlen):
t = int(i - self.firlen * filterover / 2) * tmul;
idx1 = int(i % filterover)
idx2 = int(i / filterover)
ct[idx1][idx2] = self.rrc_t(t, RCOSALPHA);
#print("Filter coefs", self.coeff)
max1 = 0
for i in range(0, filterover):
max2 = 0
for j in range(0, self.firlen):
max2 += abs(ct[i][j])
if max2 > max1:
max1 = max2
max2 = (0x3fffffff / 0x7fff) / max1;
for i in range(0, filterover):
for j in range(0, self.firlen):
self.coeff[i][j] = max2 * ct[i][j];
#f1 += self.coeff[i][j] * self.coeff[i][j];
#pulseen[i] = f1;
print("scaled filtercoefs", self.coeff)
def rrc_t(self, time, alpha):
if abs(alpha) < 1e-8:
return self.sinc(time);
if abs(time) < 1e-8:
return ((4.0 / math.pi) - 1.0) * alpha + 1.0;
opap = (1.0 + alpha) * math.pi;
omap = (1.0 - alpha) * math.pi;
at4 = alpha * time * 4;
omat4sq = 1 - at4 * at4;
if abs(omat4sq) < 1e-8:
return ((4.0 * alpha) * math.cos(opap * time) - at4 * opap * math.sin(opap * time) + omap * math.cos(omap * time)) / (1 - 3 * at4 * at4) * (1.0 / math.pi);
return (4.0 / math.pi) * alpha / omat4sq * (math.cos(opap * time) + math.sin(omap * time) / at4);
def sinc(self, x):
arg = x * math.pi
if abs(arg) < 1e-10:
return 1;
return math.sin(arg) / arg;
def filter(self, samples, phase):
f = int((phase >> 12) & 0x0f)
#print (phase , f)
s = 0
for i in range(0, self.firlen):
s += self.coeff[f][i] * samples[-i]
return int(s / (2**16))
class fsk:
def __init__(self, bps, samplerate, debug = False):
self.debug = debug
self.bitspersample = int(samplerate/bps)
self.pllmax = self.bitspersample * 8
self.pllinc = 8
print("pllinc", self.pllinc)
self.pll = 0
self.dco = 0
self.dco_a = 0
self.dco_c = 0
self.lastcurs = 0
self.filter = filt(bps, samplerate)
self.samples = [0] * self.filter.firlen
self.bits = 0
self.outbits = 0
self.outbitc = 0
def proc_sample(self, sample):
self.samples.pop(0)
self.samples.append(sample)
if self.debug:
print("pll", self.pll)
curs = self.filter.filter(self.samples, 0)#int(self.pll))
# self.dco_a += curs
# self.dco_c += 1
# if self.dco_c >= 4096:
# self.dco = int(self.dco_a / 4096)
# self.dco_a = 0
# self.dco_c = 0
# print("DCO", self.dco)
curs -= self.dco
rawbit = (int(curs) > 0) & 1
#rawit = (int(curs) >> 15) & 1
self.pll += self.pllinc
self.pll = int(self.pll)
# zero cross detected, adjust pll
if rawbit != self.bits & 1:
if self.pll < self.pllmax / 2:
self.pll -= 1
if self.debug:
print("pll -")
else:
if self.debug:
print("pll +")
self.pll += 1
if self.debug:
print("pll %04d rawbit %d bits %012x" % (self.pll, rawbit, self.bits))
# save bit
self.bits <<= 1
self.bits |= rawbit
self.bits &= 0xffffffffffff
ob = 0
# evaluate bits
if self.pll >= self.pllmax:
self.pll %= self.pllmax
bitcount = 0
for i in range(0, self.bitspersample):
if self.bits & (1 << i):
bitcount += 1
if bitcount > self.bitspersample/2:
bit = 0
ob = -1
else:
bit = 1
ob = 1
# self.outbitc += 1
# self.outbits <<= 1
# self.outbits |= bit
#
# #print(curs)
# if self.outbitc == 8:
## print("byte %02x" % self.outbits)
# self.outbitc = 0
# self.outbits = 0
#
# print("%012x %02d %d" % (self.bits, bitcount, ob))
return curs, self.pll, ob
class rs41:
mask = [ 0x96, 0x83, 0x3E, 0x51, 0xB1, 0x49, 0x08, 0x98, 0x32, 0x05, 0x59, 0x0E, 0xF9, 0x44, 0xC6, 0x26, 0x21, 0x60, 0xC2, 0xEA, 0x79, 0x5D, 0x6D, 0xA1, 0x54, 0x69, 0x47, 0x0C, 0xDC, 0xE8, 0x5C, 0xF1, 0xF7, 0x76, 0x82, 0x7F, 0x07, 0x99, 0xA2, 0x2C, 0x93, 0x7C, 0x30, 0x63, 0xF5, 0x10, 0x2E, 0x61, 0xD0, 0xBC, 0xB4, 0xB6, 0x06, 0xAA, 0xF4, 0x23, 0x78, 0x6E, 0x3B, 0xAE, 0xBF, 0x7B, 0x4C, 0xC1 ]
hdr = [ 0x10, 0xB6, 0xCA, 0x11, 0x22, 0x96, 0x12, 0xF8 ]
#hdr64 = 0x10B6CA11229612F8
#nhdr64 = 0x8635f44093df1a60
header = [0,0,0,0,1,0,0,0,0,1,1,0,1,1,0,1,0,1,0,1,0,0,1,1,1,0,0,0,1,0,0,0,0,1,0,0,0,1,0,0,0,1,1,0,1,0,0,1,0,1,0,0,1,0,0,0,0,0,0,1,1,1,1,1]
nheader = [1,1,1,1,0,1,1,1,1,0,0,1,0,0,1,0,1,0,1,0,1,1,0,0,0,1,1,1,0,1,1,1,1,0,1,1,1,0,1,1,1,0,0,1,0,1,1,0,1,0,1,1,0,1,1,1,1,1,1,0,0,0,0,0]
hdrsearch = [0] * 64
def __init__(self):
self.state = 0
self.idx = 0
self.frame = bytearray(b'\x00'*518)
self.explen = 518
self.bits = 0
def process_bit(self, bit):
if self.state == 0:
self.hdrsearch.pop(0)
self.hdrsearch.append(bit)
if self.hdrsearch == self.header:
# got header
self.state = 1
self.bitc = 0
self.bits = 0
self.idx = 8
print("got hdr")
if self.hdrsearch == self.nheader:
self.state = 2
self.bitc = 0
self.bits = 0
self.idx = 8
print("got hdr inv")
else:
self.bits >>= 1
if bit:
self.bits |= 0x80
self.bitc += 1
if self.bitc == 8:
if self.state == 2:
byte = (0xff - self.bits) ^ self.mask[self.idx % len(self.mask)]
else:
byte = (self.bits) ^ self.mask[self.idx % len(self.mask)]
# print("%d %08x %08x %08x " % (self.idx, self.bits, self.mask[self.idx % len(self.mask)], byte))
self.frame[self.idx] = byte
if self.idx == 56:
if byte == 0xf0:
self.explen = 518
elif byte == 0x0f:
self.explen = 320
else:
print("unknown frame")
self.state = 0
return
print("explen", self.explen)
self.idx += 1
if self.idx == self.explen:
self.process_frame()
print("got frame")
self.idx = 0
self.state = 0
self.bitc = 0
self.bits = 0
def process_frame(self):
print(self.idx, self.frame)
pass
wav = vav(sys.argv[1])
f = fsk(4800, wav.samprate)
rs = rs41()
samples = []
curs = []
plls = []
bits = []
while True:
s = wav.read_sample()
if not s:
print("end")
break
c, pll, b = f.proc_sample(s)
if b == -1:
rs.process_bit(0)
elif b == 1:
rs.process_bit(1)
samples.append(s)
bits.append(b*10000)
curs.append(c)
plls.append(pll)
#plt.ion()
#plt.axis('equal')
plt.grid()
#ig = plt.figure(figsize=(6,9))
#fig.canvas.mpl_connect('close_event', handle_close)
plt.plot(range(0, len(samples)), samples, color='b', label="samples", alpha=0.7)
plt.plot(range(0, len(samples)), bits, color='k', label="bits")
plt.plot(range(0, len(samples)), plls, color='r', label="phase")
plt.plot(range(0, len(samples)), curs, color='m', alpha = 0.7)
plt.tight_layout()
plt.show()
# simple frame parser for rs41, position only, crc is checked
# gps coordinate conversion from https://github.com/rs1729/RS
import math
import struct
def crc16(data: bytes, poly=0x1021):
data = bytearray(data)
crc = 0xFFFF
for b in data:
crc = crc ^ (b << 8)
for _ in range(0, 8):
if (crc & 0x8000):
crc = (crc << 1) ^ poly
else:
crc <<= 1
crc = crc & 0xffff
return crc
EARTH_a = 6378137.0
EARTH_b = 6356752.31424518
EARTH_a2_b2 = (EARTH_a*EARTH_a - EARTH_b*EARTH_b)
a = EARTH_a
b = EARTH_b
a_b = EARTH_a2_b2
e2 = EARTH_a2_b2 / (EARTH_a*EARTH_a)
ee2 = EARTH_a2_b2 / (EARTH_b*EARTH_b)
def ecef2elli(x,y,z, vx,vy,vz):
lam = math.atan2( y , x )
p = math.sqrt( x*x + y*y )
t = math.atan2( z *a , p*b )
phi = math.atan2( z + ee2 * b * math.sin(t)*math.sin(t)*math.sin(t) , p - e2 * a * math.cos(t)*math.cos(t)*math.cos(t) )
R = a / math.sqrt( 1 - e2 * math.sin(phi) * math.sin(phi) )
alt = p / math.cos(phi) - R
lat = phi * 180/ math.pi
lon = lam * 180/ math.pi
vN = -vx * math.sin(phi)*math.cos(lam) - vy*math.sin(phi)*math.sin(lam) + vz*math.cos(phi)
vE = -vx * math.sin(lam) + vy*math.cos(lam);
vU = vx * math.cos(phi)*math.cos(lam) + vy*math.cos(phi)*math.sin(lam) + vz*math.sin(phi);
vH = math.sqrt(vN * vN + vE * vE);
h = math.atan2(vE, vN) * 180 / math.pi;
if h < 0:
h += 360;
return lat, lon, alt, vN, vE, vU, vH, h
test = bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00\x07W\x8eGZ\xfb\xba\x05\x0b\x91\xbc\x7f\xe8\x90\x05O\xe8\xd6\x9d\xa7\xc6xw\xc3k\xa1\x8a\xaf\x92\xac\x1a\x01M_X\xa2:"\xceP&\xe1\x19B\xd8i\xc4\xe4\x0fy(\xc1\x1eP4220234\x1a\x00\x00\x03\x00\x00\x13@\x00\x00\x00\x072\x12\x00\x00\x00\x00\x00*\xe9s\xc3_(@>\xbb\x92\t\x0fLz*\x00\x00\x00\xd0\xfb\x01%\xe1\x02\xc6F\x08\xdf=\x07aI\x08f\x04\x02\xcf\xfb\x01,\xe1\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x19w|\x1e\xf8\x07\x08\x8e\xe4\x02\x13\xb8\x1c\xba\x1e\x93\x11\xdd\n\x92\x0c\x95\x18\xf6\x0f\xf7\x12\x90\x14\x91\x01\x91\r\xf8]\xf7}YR\xf0<\x01\xffLoH\t\xcc\r\xff\x8a!j\x0bQ\xb5\x00\xeb%\x9e\x18\xe5\xd3\x00\xa7\xb6\xda\x06\xba\xa9\xff\x14C\x0b\x16\xb7\x96\xff\xf4\xd03\x13\xcc\xe2\xfeQ\xcb\xb5\x03#O\xff4\x00\x00\x00\t3\x00\xd6o2\x1a[\x04\x00Y\xb9#\x12\x01%\x00\xa5Y\x13\x1ak\x91\xff-\x0fm\x02y\xa8\x00\xf6-{\x15\xcd\xc5\xf4\x17\xa5\x19&\x081\xdbV\x1c\x9b\xf9\xad\x08\xcb\xfa\x0b\x03\x0e\x15\xbev\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xec\xc7\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
if test[0x38] == 0x0f:
framelen = 320
else:
framelen = 518
i = 0x39
while True:
bloktype, bloklen = struct.unpack("<BB", test[i:i+2])
blokdata = test[i+2:i+2+bloklen]
blokcrc, = struct.unpack("<H", test[i+bloklen+2:i+bloklen+4])
i = i+bloklen+4
if i >= framelen:
break
if not bloktype == 0x76 and not crc16(blokdata) == blokcrc:
print("crc nok")
print("%d %02x %d " %(i, bloktype, bloklen), blokdata)
if bloktype == 0x79: # status
print("status", struct.unpack("<H8sHIIBHB16s", blokdata))
elif bloktype == 0x7a: # meas
pass
elif bloktype == 0x7c: # gpsinfo
gpsweek, gpssec, = struct.unpack("<HI", blokdata[:6])
print(gpsweek, gpssec)
elif bloktype == 0x7d: # gpsraw
pass
elif bloktype == 0x7b: # gpspos
x,y,z, vx, vy, vz, sats, sa, pdop = struct.unpack("<iiihhhBBB", blokdata)
print(x,y,z, vx,vy,vz,sats)
print(ecef2elli(x/100,y/100,z/100,vx/100, vy/100, vz/100))
elif bloktype == 0x76: # empty
pass
else:
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment