Skip to content

Instantly share code, notes, and snippets.

@AtufaShireen
Created August 7, 2023 08:25
Show Gist options
  • Save AtufaShireen/c40990050b2f8ac348aed660e54701b2 to your computer and use it in GitHub Desktop.
Save AtufaShireen/c40990050b2f8ac348aed660e54701b2 to your computer and use it in GitHub Desktop.
Projecting the Axis of Android to the NED reference frame
import dash
from dash.dependencies import Output, Input
from dash import dcc, html, dcc
from datetime import datetime
import json
import plotly.graph_objs as go
from collections import deque
from flask import Flask, request
import numpy as np
server = Flask(__name__)
app = dash.Dash(__name__, server=server)
MAX_DATA_POINTS = 1000
UPDATE_FREQ_MS = 100
time = deque(maxlen=MAX_DATA_POINTS)
accel_x = deque(maxlen=MAX_DATA_POINTS)
accel_y = deque(maxlen=MAX_DATA_POINTS)
accel_z = deque(maxlen=MAX_DATA_POINTS)
q_x = deque(maxlen=MAX_DATA_POINTS)
q_y = deque(maxlen=MAX_DATA_POINTS)
q_z = deque(maxlen=MAX_DATA_POINTS)
q_w = deque(maxlen=MAX_DATA_POINTS)
accel_n = deque(maxlen=MAX_DATA_POINTS)
accel_e = deque(maxlen=MAX_DATA_POINTS)
accel_d = deque(maxlen=MAX_DATA_POINTS)
# app.layout = html.Div(
# children=[
# dcc.Markdown(
# children="""
# # Live Sensor Readings
# Streamed from Sensor Logger: tszheichoi.com/sensorlogger
# """
# ),
# html.Div([dcc.Graph(id="live_graph",animate=True),]),
# dcc.Interval(id="counter", interval=UPDATE_FREQ_MS),
# ]
# )
app.layout = html.Div(children=
[html.Div([
dcc.Markdown(
children="""
# Live Sensor NED
"""
),
dcc.Graph(id="live_graph1"),
dcc.Interval(id="counter1", interval=UPDATE_FREQ_MS)]),
html.Div([
dcc.Markdown(
children="""
# Live Sensor Raw
"""
),
dcc.Graph(id="live_graph2"),
dcc.Interval(id="counter2", interval=UPDATE_FREQ_MS),
])
])
# from: https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8621449/ (20)
def transformed_axes(ax,ay,az ,qw,qx,qy,qz): # -> w,x,y,z
for i in range(len(ax)):
x = ax[i]
y = ay[i]
z = az[i]
q0 = qw[i]
q1 = qx[i]
q2 = qy[i]
q3 = qz[i]
trans_matrix = np.array([
[q0*q0 + q1*q1 - q2*q2 - q3*q3, 2*(q1*q2 - q3*q0), 2*(q1*q3 + q2*q0)],
[2*(q1*q2 + q3*q0), q0*q0 - q1*q1 + q2*q2 - q3*q3, 2*(q2*q3 - q1*q0)],
[2*(q1*q3 - q2*q0), 2*(q2*q3 + q1*q0), q0*q0 - q1*q1 - q2*q2 + q3*q3]
])
ned = trans_matrix.dot(np.array([x,y,z]).T) # x,y,z => North, East, Down
accel_n.append(ned[0])
accel_e.append(ned[1])
accel_d.append(ned[2])
return
@app.callback(Output("live_graph1", "figure"), Input("counter1", "n_intervals"))
def update_graph(_counter):
transformed_axes(accel_x,accel_y,accel_z,q_w,q_x,q_y,q_z)
data = [
go.Scatter(x=list(time), y=list(d), name=name)
for d, name in zip([accel_n, accel_e, accel_d], ["N", "E", "D"])
]
graph = {
"data": data,
"layout": go.Layout(
{
"xaxis": {"type": "date"},
"yaxis": {"title": "Acceleration ms<sup>-2</sup>"},
}
),
}
# }
return graph
@app.callback(Output("live_graph2", "figure"), Input("counter1", "n_intervals"))
def update_graph(_counter):
transformed_axes(accel_x,accel_y,accel_z,q_w,q_x,q_y,q_z)
# ]
data = [
go.Scatter(x=list(time), y=list(d), name=name)
for d, name in zip([accel_x, accel_y, accel_z], ["X", "Y", "Z"])
]
graph = {
"data": data,
"layout": go.Layout(
{
"xaxis": {"type": "date"},
"yaxis": {"title": "Acceleration ms<sup>-2</sup>"},
}
),
}
# }
return graph
@app.callback(Output("live_graph2", "figure"), Input("counter2", "n_intervals"))
def update_graph2(_counter):
data2 = [
go.Scatter(x=list(time), y=list(d), name=name)
for d, name in zip([accel_x, accel_y, accel_z], ["x", "y", "z"])
]
graph2 = {
"data": data2,
"layout": go.Layout(
{
"xaxis": {"type": "date"},
"yaxis": {"title": "Acceleration ms<sup>-2</sup>"},
}
),
}
if (len(time) > 0): #  cannot adjust plot ranges until there is at least one data point
graph2["layout"]["xaxis"]["range"] = [min(time), max(time)]
graph2["layout"]["yaxis"]["range"] = [
# min(accel_n + accel_e + accel_d),
# max(accel_n + accel_e + accel_d),
-100,100
]
return graph2
@server.route("/data", methods=["POST"])
def data(): # listens to the data streamed from the sensor logger
if str(request.method) == "POST":
print(f'received data: {request.data}')
data = json.loads(request.data)
for d in data['payload']:
if d.get('name') == 'totalacceleration':
ts = datetime.fromtimestamp(d["time"] / 1000000000)
time.append(ts)
accel_x.append(d["values"]["x"])
accel_y.append(d["values"]["y"])
accel_z.append(d["values"]["z"])
if d.get('name') == 'orientation':
q_x.append(d["values"]["qx"])
q_y.append(d["values"]["qy"])
q_z.append(d["values"]["qz"])
q_w.append(d["values"]["qw"])
return "success"
if __name__ == "__main__":
app.run_server(port=8000, host="0.0.0.0")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment