Created
April 7, 2022 18:05
-
-
Save Taremeh/29dd165e2f7ad4fcee7f2062bad7e2f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import time | |
import asyncio | |
from datetime import datetime | |
import websockets # from websockets import WebSocketServer, WebSocketServerProtocol, ConnectionClosedError, ConnectionClosedOK | |
from functools import partial | |
import json | |
from response_object import ResponseObject | |
import redis | |
import random | |
CLIENTS = set() | |
messageReceivedTime = None | |
actuatorTargets = {} | |
mcu_navigation_output_data_x_arr = [0] | |
mcu_navigation_output_data_y_arr = [0] | |
mcu_navigation_output_data_z_arr = [0] | |
def get_manifest(): | |
with open('./manifest.json') as f: | |
manifest = json.loads(f.read()) | |
return manifest | |
manifest = get_manifest() | |
async def websocket_handler(websocket: websockets.WebSocketServer, path: str): | |
json_data_response: str | |
register_websocket(websocket) | |
while True: | |
try: | |
message = json.loads(await websocket.recv()) | |
print(f"< {message}") | |
if message["type"] == "data_request": #get_db_sensor_data | |
# SIMULATE DATA | |
json_data_response = json.dumps(get_sensor_data(message["category"]).__dict__) # use ResponsObject.__dict__ to get a dictionary representation that json.dumps into string | |
# REAL DB DATA | |
# json_data_response = json.dumps(get_db_sensor_data(message["category"]).__dict__) | |
print(f"> {json_data_response}") | |
await websocket.send(json_data_response) | |
elif message["type"] == "actuator_put": | |
put_actuator_value(message["actuator_id"], message["value"]) | |
elif message["type"] == "manifest_request": | |
manifest = get_manifest() | |
manifest["is_manifest"] = True; | |
await websocket.send(json.dumps(manifest)) | |
except websockets.ConnectionClosedOK: | |
print(f"Terminated OK") | |
unregister_websocket(websocket) | |
break | |
except websockets.ConnectionClosedError as e: | |
print(f"Terminated Error (Client not responding): Code: {e.code}") | |
unregister_websocket(websocket) | |
break | |
def register_websocket( websocket: websockets.WebSocketServer): | |
CLIENTS.add(websocket) | |
def unregister_websocket( websocket: websockets.WebSocketServer): | |
CLIENTS.remove(websocket) | |
def get_sensor_data(category) -> ResponseObject: | |
# ToDo: lookup sensor values | |
data_points = {} | |
for sensor_key in manifest["sensors"]: | |
data_points[sensor_key] = getMockupSensorValue(sensor_key) | |
for act_key in manifest["actuators"]: | |
data_points[act_key] = getMockupSensorValue(act_key) | |
#data_points[act_key+"_ovr"] = getActuatorOverrideStatus(act_key) # | |
for mcu_key in manifest["mcu"]: | |
data_points[mcu_key] = getMockupSensorValue(mcu_key) | |
for config_key in manifest["config"]: | |
data_points[config_key] = getMockupSensorValue(config_key) | |
actuator_targets = {} | |
#for actuator_id in actuatorTargets: | |
# if actuator_id in sensor_ids: | |
# actuator_targets[actuator_id] = actuatorTargets[actuator_id] | |
print(data_points) | |
return ResponseObject(data_points, actuator_targets) | |
def get_db_sensor_data(category) -> ResponseObject: | |
# ToDo: lookup sensor values | |
data_points = {} | |
for sensor_key in manifest["sensors"]: | |
data_points[sensor_key] = getDatabaseValue(sensor_key) | |
for act_key in manifest["actuators"]: | |
data_points[act_key] = getDatabaseValue(act_key) | |
#data_points[act_key+"_ovr"] = getActuatorOverrideStatus(act_key) # | |
for mcu_key in manifest["mcu"]: | |
data_points[mcu_key] = getDatabaseValue(mcu_key) | |
for config_key in manifest["config"]: | |
data_points[config_key] = getDatabaseValue(config_key) | |
actuator_targets = {} | |
#for actuator_id in actuatorTargets: | |
# if actuator_id in sensor_ids: | |
# actuator_targets[actuator_id] = actuatorTargets[actuator_id] | |
print(data_points) | |
return ResponseObject(data_points, actuator_targets) | |
def put_actuator_value( actuator_id: str, actuator_value: float): | |
# ToDo: set actuator value | |
actuatorTargets[actuator_id] = actuator_value | |
return None | |
def setupMockDatabase(): | |
for section in manifest: | |
print(f"--------------\n" + section) | |
for key in manifest[section]: | |
print(" > " + key + ": " + json.dumps(manifest[section][key])) | |
# set keys with default values | |
db.set(key, manifest[section][key]["range_min"] if "range_min" in manifest[section][key] else 0) | |
def getMockupSensorValue(key): | |
scale = 1 | |
if (key[0:3] == "sen"): | |
scale = manifest["sensors"][key]["range_max"] if "range_max" in manifest["sensors"][key] else 1 | |
if (key[0:3] == "act"): | |
scale = manifest["actuators"][key]["range_max"] if "range_max" in manifest["actuators"][key] else 1 | |
if (key[0:3] == "mcu"): | |
if key in ["mcu_navigation_output_data_x", "mcu_navigation_output_data_y", "mcu_navigation_output_data_z", "mcu_macro_state"]: | |
if key == "mcu_navigation_output_data_x": | |
db.set('mcu_navigation_output_data_x', float(db.get('mcu_navigation_output_data_x').decode('utf-8'))+0.01) | |
newVal = round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2) | |
if(newVal != mcu_navigation_output_data_x_arr[-1]): | |
mcu_navigation_output_data_x_arr.append(newVal) | |
return mcu_navigation_output_data_x_arr | |
if key == "mcu_navigation_output_data_y": | |
scale = 300 | |
newVal = round((math.sin(time.time()/10)+1)/2 * scale - 150, 2) | |
if(len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_y_arr)): | |
mcu_navigation_output_data_y_arr[-1] = newVal | |
else: | |
mcu_navigation_output_data_y_arr.append(newVal) | |
return mcu_navigation_output_data_y_arr | |
if key == "mcu_navigation_output_data_z": | |
newVal = round((math.sin(time.time() / 30) - 1) / 2 * 2, 2) | |
if (len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_z_arr)): | |
mcu_navigation_output_data_z_arr[-1] = newVal | |
else: | |
mcu_navigation_output_data_z_arr.append(newVal) | |
return mcu_navigation_output_data_z_arr | |
if key == "mcu_macro_state": | |
return ["Macro State 1", "Macro State 2"][random.randint(0, 1)] | |
else: | |
scale = manifest["mcu"][key]["range_max"] if "range_max" in manifest["mcu"][key] else 1 | |
if (key[0:3] == "con"): | |
scale = manifest["config"][key]["range_max"] if "range_max" in manifest["config"][key] else 1 | |
return round((math.sin(time.time()/10)+1)/2 * scale, 2) | |
def getDatabaseValue(key): | |
if (key[0:3] == "mcu"): | |
if key in ["mcu_navigation_output_data_x", "mcu_navigation_output_data_y", "mcu_navigation_output_data_z", "mcu_macro_state"]: | |
if key == "mcu_navigation_output_data_x": | |
newVal = round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2) | |
if(newVal != mcu_navigation_output_data_x_arr[-1]): | |
mcu_navigation_output_data_x_arr.append(newVal) | |
return mcu_navigation_output_data_x_arr | |
if key == "mcu_navigation_output_data_y": | |
newVal = round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2) | |
if(len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_y_arr)): | |
mcu_navigation_output_data_y_arr[-1] = newVal | |
else: | |
mcu_navigation_output_data_y_arr.append(newVal) | |
return mcu_navigation_output_data_y_arr | |
if key == "mcu_navigation_output_data_z": | |
newVal = round(float(db.get('mcu_navigation_output_data_z').decode('utf-8')), 2) | |
if (len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_z_arr)): | |
mcu_navigation_output_data_z_arr[-1] = newVal | |
else: | |
mcu_navigation_output_data_z_arr.append(newVal) | |
return mcu_navigation_output_data_z_arr | |
if key == "mcu_macro_state": | |
return db.get('mcu_macro_state').decode('utf-8') | |
else: | |
return round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2) | |
def getActuatorOverrideStatus(act_key): | |
setMockupActuatorOverrideStatus(act_key) # Todo: Delete line for Production | |
db.hget(act_key, 'telem_override') | |
pass | |
def setMockupActuatorOverrideStatus(act_key): | |
db.hset(act_key, 'telem_override', 0) | |
if __name__ == '__main__': | |
db = redis.Redis(host='localhost', port=6379, db=0) | |
time.sleep(0.1) | |
#init_helpers(db, 'Telemetry') | |
print(f'Telemetry initialised...') | |
server = websockets.serve(websocket_handler, "localhost", 5678) | |
print(f'Starting Telemetry Websocket...') | |
print() | |
setupMockDatabase() | |
asyncio.get_event_loop().run_until_complete(server) | |
asyncio.get_event_loop().run_forever() # ToDo: Move blocking loop out of setup function | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment