Skip to content

Instantly share code, notes, and snippets.

@Taremeh
Created April 7, 2022 18:05
Show Gist options
  • Save Taremeh/29dd165e2f7ad4fcee7f2062bad7e2f0 to your computer and use it in GitHub Desktop.
Save Taremeh/29dd165e2f7ad4fcee7f2062bad7e2f0 to your computer and use it in GitHub Desktop.
import math
import time
import asyncio
from datetime import datetime
import websockets # from websockets import WebSocketServer, WebSocketServerProtocol, ConnectionClosedError, ConnectionClosedOK
from functools import partial
import json
from response_object import ResponseObject
import redis
import random
CLIENTS = set()
messageReceivedTime = None
actuatorTargets = {}
mcu_navigation_output_data_x_arr = [0]
mcu_navigation_output_data_y_arr = [0]
mcu_navigation_output_data_z_arr = [0]
def get_manifest():
with open('./manifest.json') as f:
manifest = json.loads(f.read())
return manifest
manifest = get_manifest()
async def websocket_handler(websocket: websockets.WebSocketServer, path: str):
json_data_response: str
register_websocket(websocket)
while True:
try:
message = json.loads(await websocket.recv())
print(f"< {message}")
if message["type"] == "data_request": #get_db_sensor_data
# SIMULATE DATA
json_data_response = json.dumps(get_sensor_data(message["category"]).__dict__) # use ResponsObject.__dict__ to get a dictionary representation that json.dumps into string
# REAL DB DATA
# json_data_response = json.dumps(get_db_sensor_data(message["category"]).__dict__)
print(f"> {json_data_response}")
await websocket.send(json_data_response)
elif message["type"] == "actuator_put":
put_actuator_value(message["actuator_id"], message["value"])
elif message["type"] == "manifest_request":
manifest = get_manifest()
manifest["is_manifest"] = True;
await websocket.send(json.dumps(manifest))
except websockets.ConnectionClosedOK:
print(f"Terminated OK")
unregister_websocket(websocket)
break
except websockets.ConnectionClosedError as e:
print(f"Terminated Error (Client not responding): Code: {e.code}")
unregister_websocket(websocket)
break
def register_websocket( websocket: websockets.WebSocketServer):
CLIENTS.add(websocket)
def unregister_websocket( websocket: websockets.WebSocketServer):
CLIENTS.remove(websocket)
def get_sensor_data(category) -> ResponseObject:
# ToDo: lookup sensor values
data_points = {}
for sensor_key in manifest["sensors"]:
data_points[sensor_key] = getMockupSensorValue(sensor_key)
for act_key in manifest["actuators"]:
data_points[act_key] = getMockupSensorValue(act_key)
#data_points[act_key+"_ovr"] = getActuatorOverrideStatus(act_key) #
for mcu_key in manifest["mcu"]:
data_points[mcu_key] = getMockupSensorValue(mcu_key)
for config_key in manifest["config"]:
data_points[config_key] = getMockupSensorValue(config_key)
actuator_targets = {}
#for actuator_id in actuatorTargets:
# if actuator_id in sensor_ids:
# actuator_targets[actuator_id] = actuatorTargets[actuator_id]
print(data_points)
return ResponseObject(data_points, actuator_targets)
def get_db_sensor_data(category) -> ResponseObject:
# ToDo: lookup sensor values
data_points = {}
for sensor_key in manifest["sensors"]:
data_points[sensor_key] = getDatabaseValue(sensor_key)
for act_key in manifest["actuators"]:
data_points[act_key] = getDatabaseValue(act_key)
#data_points[act_key+"_ovr"] = getActuatorOverrideStatus(act_key) #
for mcu_key in manifest["mcu"]:
data_points[mcu_key] = getDatabaseValue(mcu_key)
for config_key in manifest["config"]:
data_points[config_key] = getDatabaseValue(config_key)
actuator_targets = {}
#for actuator_id in actuatorTargets:
# if actuator_id in sensor_ids:
# actuator_targets[actuator_id] = actuatorTargets[actuator_id]
print(data_points)
return ResponseObject(data_points, actuator_targets)
def put_actuator_value( actuator_id: str, actuator_value: float):
# ToDo: set actuator value
actuatorTargets[actuator_id] = actuator_value
return None
def setupMockDatabase():
for section in manifest:
print(f"--------------\n" + section)
for key in manifest[section]:
print(" > " + key + ": " + json.dumps(manifest[section][key]))
# set keys with default values
db.set(key, manifest[section][key]["range_min"] if "range_min" in manifest[section][key] else 0)
def getMockupSensorValue(key):
scale = 1
if (key[0:3] == "sen"):
scale = manifest["sensors"][key]["range_max"] if "range_max" in manifest["sensors"][key] else 1
if (key[0:3] == "act"):
scale = manifest["actuators"][key]["range_max"] if "range_max" in manifest["actuators"][key] else 1
if (key[0:3] == "mcu"):
if key in ["mcu_navigation_output_data_x", "mcu_navigation_output_data_y", "mcu_navigation_output_data_z", "mcu_macro_state"]:
if key == "mcu_navigation_output_data_x":
db.set('mcu_navigation_output_data_x', float(db.get('mcu_navigation_output_data_x').decode('utf-8'))+0.01)
newVal = round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2)
if(newVal != mcu_navigation_output_data_x_arr[-1]):
mcu_navigation_output_data_x_arr.append(newVal)
return mcu_navigation_output_data_x_arr
if key == "mcu_navigation_output_data_y":
scale = 300
newVal = round((math.sin(time.time()/10)+1)/2 * scale - 150, 2)
if(len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_y_arr)):
mcu_navigation_output_data_y_arr[-1] = newVal
else:
mcu_navigation_output_data_y_arr.append(newVal)
return mcu_navigation_output_data_y_arr
if key == "mcu_navigation_output_data_z":
newVal = round((math.sin(time.time() / 30) - 1) / 2 * 2, 2)
if (len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_z_arr)):
mcu_navigation_output_data_z_arr[-1] = newVal
else:
mcu_navigation_output_data_z_arr.append(newVal)
return mcu_navigation_output_data_z_arr
if key == "mcu_macro_state":
return ["Macro State 1", "Macro State 2"][random.randint(0, 1)]
else:
scale = manifest["mcu"][key]["range_max"] if "range_max" in manifest["mcu"][key] else 1
if (key[0:3] == "con"):
scale = manifest["config"][key]["range_max"] if "range_max" in manifest["config"][key] else 1
return round((math.sin(time.time()/10)+1)/2 * scale, 2)
def getDatabaseValue(key):
if (key[0:3] == "mcu"):
if key in ["mcu_navigation_output_data_x", "mcu_navigation_output_data_y", "mcu_navigation_output_data_z", "mcu_macro_state"]:
if key == "mcu_navigation_output_data_x":
newVal = round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2)
if(newVal != mcu_navigation_output_data_x_arr[-1]):
mcu_navigation_output_data_x_arr.append(newVal)
return mcu_navigation_output_data_x_arr
if key == "mcu_navigation_output_data_y":
newVal = round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2)
if(len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_y_arr)):
mcu_navigation_output_data_y_arr[-1] = newVal
else:
mcu_navigation_output_data_y_arr.append(newVal)
return mcu_navigation_output_data_y_arr
if key == "mcu_navigation_output_data_z":
newVal = round(float(db.get('mcu_navigation_output_data_z').decode('utf-8')), 2)
if (len(mcu_navigation_output_data_x_arr) == len(mcu_navigation_output_data_z_arr)):
mcu_navigation_output_data_z_arr[-1] = newVal
else:
mcu_navigation_output_data_z_arr.append(newVal)
return mcu_navigation_output_data_z_arr
if key == "mcu_macro_state":
return db.get('mcu_macro_state').decode('utf-8')
else:
return round(float(db.get('mcu_navigation_output_data_x').decode('utf-8')), 2)
def getActuatorOverrideStatus(act_key):
setMockupActuatorOverrideStatus(act_key) # Todo: Delete line for Production
db.hget(act_key, 'telem_override')
pass
def setMockupActuatorOverrideStatus(act_key):
db.hset(act_key, 'telem_override', 0)
if __name__ == '__main__':
db = redis.Redis(host='localhost', port=6379, db=0)
time.sleep(0.1)
#init_helpers(db, 'Telemetry')
print(f'Telemetry initialised...')
server = websockets.serve(websocket_handler, "localhost", 5678)
print(f'Starting Telemetry Websocket...')
print()
setupMockDatabase()
asyncio.get_event_loop().run_until_complete(server)
asyncio.get_event_loop().run_forever() # ToDo: Move blocking loop out of setup function
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment