Skip to content

Instantly share code, notes, and snippets.

@elliotwoods
Created September 6, 2017 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elliotwoods/0cca512a9708911879177fc2714c2a8e to your computer and use it in GitHub Desktop.
Save elliotwoods/0cca512a9708911879177fc2714c2a8e to your computer and use it in GitHub Desktop.
from KC31Node import app
from KC31Node import serialDevice
import sys
import time
from IPython import embed
import json
from flask import jsonify
from threading import Lock, Timer
from numpy import array, mean, std
import matplotlib.pyplot as plt
from scipy import stats
class MotionReadings:
def __init__(self, content):
self.data = array(content['data'])
self.mean = mean(self.data, axis=0)
self.trimmedMean = stats.trim_mean(self.data, 0.1, axis=0) # trim 20% of data
self.std = std(self.data, axis=0)
self.duration = content['duration']
self.count = len(content['data'])
class Axis:
def __init__(self, serialDevice):
self.serialDevice = serialDevice
self.serialDevice.listeners.append(self)
self.onInitialiseIndex = None
self.incomingMotionReadingsLock = Lock()
self.incomingMotionReadings = []
self.clear()
self.receiveHandlers = {
"axisID" : self._receiveAxisID,
"reportStatus" : self._receiveStatus,
"accelRx" : self._receiveMotionReadings
}
def initialize(self):
self.clear();
return self.serialDevice.open()
def clear(self):
self.serialDevice.close()
self.index = None
self.initializeTimer = None
self.deviceStatus = {}
def getMotionReading(self, readingCount = 1000, timeout = 10.0):
if not self.index is 2:
raise Exception("Cannot take motion readings for axisID " + str(self.axisID))
#clear the response buffer
self.incomingMotionReadings = []
print("Sending request for " + str(readingCount) + " readings")
#send the request
self.send({
"app" : {
"motionSensor" : {
"takeReadings" : int(readingCount)
}
}
})
#wait for the response to arrive
beginTime = time.time()
hasData = False
while not hasData:
time.sleep(0.1)
self.incomingMotionReadingsLock.acquire()
try :
hasData = not len(self.incomingMotionReadings) is 0
except:
raise
finally:
self.incomingMotionReadingsLock.release()
if time.time() - beginTime > timeout:
raise Exception("Timeout taking readings")
print("Request complete in " + str(time.time() - beginTime) + " seconds")
if not hasData:
raise AxisException("Timeout waiting for motion readings")
self.incomingMotionReadingsLock.acquire()
data = None
try:
data = self.incomingMotionReadings[-1]
self.incomingMotionReadings = []
except:
raise
finally:
self.incomingMotionReadingsLock.release()
return data
def receive(self, message):
if type(message) is dict:
for key, value in message.items():
if key in self.receiveHandlers.keys():
self.receiveHandlers[key](value)
else:
print(json.dumps({ key : value }))
def send(self, message):
self.serialDevice.sendObject(message)
@property
def report(self):
return {
'deviceStatus': self.deviceStatus,
'index': self.index
};
def _receiveAxisID(self, content):
try:
self.index = int(content)
if not self.onInitialiseIndex is None:
self.onInitialiseIndex(self, self.index)
except Exception as e:
print("Can't set index to " + str(content) + " : " + str(e));
def _receiveStatus(self, content):
self.deviceStatus = content
def _receiveMotionReadings(self, content):
motionReadings = MotionReadings(content)
self.incomingMotionReadingsLock.acquire()
try:
self.incomingMotionReadings.append(motionReadings)
except:
raise
finally:
self.incomingMotionReadingsLock.release()
class Axes:
def __init__(self):
self.clear()
self.initialize()
def _assignAxis(self, axis, index):
self.axesLock.acquire()
try:
self.axes[index] = axis
finally:
self.axesLock.release()
self.uninitializedAxesLock.acquire()
try:
self.uninitializedAxes.remove(axis)
finally:
self.uninitializedAxesLock.release()
def initialize(self):
self.clear()
serialPorts = serialDevice.listSerialDevices()
print("Found unfiltered serial devices : " + str(serialPorts))
# filter based on known serial ports
filters = ['CH340']
print(str(filters))
filteredList = [x for x in serialPorts if any(
filter in x.description for filter in filters)]
print("Found relevant serial devices : " + str(filteredList))
# add Axis objects to unitialized axis list
for serialPort in filteredList:
device = serialDevice.SerialDevice(serialPort[0])
axis = Axis(device)
axis.onInitialiseIndex = lambda axis, index: self._assignAxis(axis, index)
axis.initialize()
self.uninitializedAxes.append(axis)
return {
"foundAxes": len(filteredList),
"uninitializedAxes": len(self.uninitializedAxes)
}
def clear(self):
try:
self.uninitializedAxesLock.acquire()
try:
for unitializedAxis in self.uninitializedAxes:
if not unitializedAxis.initializeTimer is None:
unitializedAxis.initializeTimer.cancel()
unitializedAxis.initializeTimer = None
unitializedAxis.clear()
self.uninitializedAxes = []
finally:
self.uninitializedAxesLock.release()
self.axesLock.acquire()
try:
for axis in self.axes.values():
if not axis is None:
axis.clear()
axis = None
self.axes = []
finally:
self.axesLock.release()
except:
pass
self.axesLock = Lock()
self.axes = {
1: None,
2: None
}
self.uninitializedAxesLock = Lock()
self.uninitializedAxes = []
return {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment