Skip to content

Instantly share code, notes, and snippets.

@stmobo
Last active March 12, 2019 03:28
Show Gist options
  • Save stmobo/761627d6f4b33c9e9b6923cf4fc0a9bc to your computer and use it in GitHub Desktop.
Save stmobo/761627d6f4b33c9e9b6923cf4fc0a9bc to your computer and use it in GitHub Desktop.
import numpy as np
import matplotlib.pyplot as plt
signal_length = 300
timespan = 30
t = np.linspace(0, timespan, signal_length)
def impulse(a, x, shift, max_amp):
t = ((x + shift)/a)**2
y = np.exp(-t) / (np.abs(a) * np.sqrt(np.pi))
sf = max_amp / (np.amax(y) - np.amin(y))
return y * sf
def sinewave(f, A, t, ps):
return A * np.sin(2*np.pi*f*(t+ps))
# tuning max Y acc: +/- 0.5 m/s
signal_components = [
(np.maximum(sinewave(1/10, 0.5, t, 0), 0), 'r'),
((impulse(1/3, t, -5, 0.75)), 'g'),
((impulse(1/7, t, -7, 0.75)), 'g'),
((-impulse(1/7, t, -16, 0.75)), 'g'),
((impulse(1, t, -23, 1.25)), 'g'),
((np.random.standard_normal(signal_length) * 0.05), 'b'),
]
raw_signal = sum(map(lambda o: o[0], signal_components))
#raw_signal[0] = 0
def highpass(x, dt, RC):
y = np.zeros(x.shape)
a = RC / (RC + dt)
#y[0] = x[0]
y[0] = 0
for i in range(1, len(x)):
y[i] = a * (y[i-1] + x[i] - x[i-1])
return y
def lowpass(x, dt, RC):
y = np.zeros(x.shape)
a = RC / (RC + dt)
#y[0] = a * x[0]
y[0] = 0
for i in range(1, len(x)):
y[i] = y[i-1] + (a * (x[i] - y[i-1]))
return y
dt = timespan / signal_length
plt.subplot(211)
for comp, color in signal_components:
plt.plot(t, comp, color+'--')
plt.plot(t, raw_signal, 'k-', label='Raw')
plt.legend()
plt.subplot(212)
def rescale_signal(x, out_min, out_max):
x_max = np.amax(x)
x_min = np.amin(x)
return (((out_max - out_min)*(x - x_min)) / (x_max - x_min)) + out_min
def moving_average_convolve(input_signal, kernel_sz):
kernel = np.ones(kernel_sz) / kernel_sz
return np.convolve(input_signal, kernel, mode='same')
def filter_signal(input_signal):
stage1_filter_sz = 5
highpass_cutoff = 4.5
highpass_RC = 1 / (2*np.pi*highpass_cutoff)
stage2_filter_sz = 3
input_min = np.amin(input_signal)
input_max = np.amax(input_signal)
filter_1 = moving_average_convolve(input_signal, stage1_filter_sz)
filter_2 = highpass(filter_1, dt, highpass_RC)
filter_3 = moving_average_convolve(filter_2, stage2_filter_sz)
out = (filter_3 - np.mean(filter_3)) / np.std(filter_3)
#plt.plot(t, filter_2, 'r--')
#out = rescale_signal(filter_3, -1, 1)
#plt.plot(t, filter_1, 'r-', label="Stage 1")
#plt.plot(t, filter_2, 'b-', label="Stage 2")
#plt.plot(t, filter_3, 'g-', label="Stage 3")
plt.plot(t, out, 'k-', label="Output")
plt.plot(t, 1.25 * np.ones(signal_length), 'c--')
plt.plot(t, -1.25 * np.ones(signal_length), 'c--')
filter_signal(raw_signal)
plt.legend()
#plot_many(raw_signal, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
plt.show()
import serial
import struct
import numpy as np
import sys
import time
import matplotlib.pyplot as plt
def waitForBytes(port, byteSeq):
while True:
for idx, b in enumerate(byteSeq):
r = port.read(1)[0]
if r != b:
break # restart sequence
else:
return
frame_fmt = "<Bxxx" + ('f'*13) + 'BB'
def readFrame(port):
waitForBytes(port, bytes([0xA5, 0x5A]))
size = port.read(1)[0]
recv = [size]
recv.extend(port.read(size-1))
recv = bytes(recv)
frame = struct.unpack(frame_fmt, recv)
checksum = 0
for b in recv:
checksum ^= b
if checksum != 0:
print("Invalid frame received (got checksum {:02x}, expected 00)".format(checksum))
return
flags = frame[-2]
gps_valid = (flags & 1) > 0
lat_north = (flags & (1<<1)) > 0
lon_east = (flags & (1<<2)) > 0
return {
'size': frame[0],
'calib': np.array(frame[1:4]),
'acc': np.array(frame[4:7]),
'gyro': np.array(frame[7:10]),
'time': frame[10],
'gps': np.array(frame[11:13]),
'altitude': np.array(frame[13]),
'gps_valid': gps_valid,
'latitude_north': lat_north,
'longitude_east': lon_east
}
def highpass(x, dt, RC):
y = np.zeros(x.shape)
a = RC / (RC + dt)
#y[0] = x[0]
y[0] = 0
for i in range(1, len(x)):
y[i] = a * (y[i-1] + x[i] - x[i-1])
return y
def lowpass(x, dt, RC):
y = np.zeros(x.shape)
a = RC / (RC + dt)
#y[0] = a * x[0]
y[0] = 0
for i in range(1, len(x)):
y[i] = y[i-1] + (a * (x[i] - y[i-1]))
return y
def moving_average_convolve(input_signal, kernel_sz):
kernel = np.ones(kernel_sz) / kernel_sz
return np.convolve(input_signal, kernel, mode='same')
def filter_signal(input_signal, dt):
stage1_filter_sz = 7
highpass_cutoff = 4.5
highpass_RC = 1 / (2*np.pi*highpass_cutoff)
stage2_filter_sz = 17
if stage1_filter_sz % 2 == 0:
stage1_filter_sz += 1
if stage2_filter_sz % 2 == 0:
stage2_filter_sz += 1
input_min = np.amin(input_signal)
input_max = np.amax(input_signal)
filter_1 = moving_average_convolve(input_signal, stage1_filter_sz)
filter_2 = highpass(filter_1, dt, highpass_RC)
filter_3 = moving_average_convolve(filter_2, stage2_filter_sz)
#return (filter_3 - np.mean(filter_3)) / np.std(filter_3)
return filter_3
def main():
port = serial.Serial('COM28')
calib_vector = np.zeros(3)
data = []
t = []
starttime = time.perf_counter()
print('')
plt.ion()
fig = plt.figure()
ax = fig.add_subplot(111)
last_draw_time = 0
time.sleep(1)
while True:
frame = readFrame(port)
calib_vector = frame['calib']
data.append(frame['acc'])
t.append(time.perf_counter() - starttime)
if time.perf_counter() - last_draw_time > 1 and len(t) > 50:
acc_signal = np.dot(data, calib_vector) / np.sqrt(calib_vector.dot(calib_vector))
dt = np.mean(np.convolve(t, [1, -1], mode='valid'))
filtered = filter_signal(acc_signal, dt)
ax.clear()
ax.plot(t, filtered, 'k-', label="Output")
ax.plot(t, 0.07 * np.ones(len(filtered)), 'c--')
ax.plot(t, -0.07 * np.ones(len(filtered)), 'c--')
time_cutoff = max(t[-1]-30, 0)
_t = np.array(t)
idx = _t >= time_cutoff
y_max = max(np.amax(filtered[idx]), 0.07)
y_min = min(np.amin(filtered[idx]), -0.07)
events = np.argwhere(np.abs(filtered[idx]) > 0.07)
for i in events:
ax.axvline(x=_t[idx][i], color='r')
ax.set_xlim(time_cutoff, t[-1]+0.5)
ax.set_ylim(y_min-0.05, y_max+0.05)
#ax.plot(t, acc_signal, 'r-')
plt.pause(0.0001)
last_draw_time = time.perf_counter()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment