Skip to content

Instantly share code, notes, and snippets.

@brocksprogramming
Last active August 24, 2020 04:16
Show Gist options
  • Save brocksprogramming/fa9bacb654e418793f67088cc3ffbbf3 to your computer and use it in GitHub Desktop.
Save brocksprogramming/fa9bacb654e418793f67088cc3ffbbf3 to your computer and use it in GitHub Desktop.
MLGlovesMLModel1.1.py
'''
This software was made in hopes of pioneering machine learning gloves. The way the program is setup, the main application is for typing on any hard surface without a keyboard.
Copyright (C) 2020 Brock Sterling Lynch
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
You may reach the author of this software at brocksprogramming@gmail.com
'''
# Tensorflow v1.5 was used in this project
import pandas as pd
import tensorflow as tf
import numpy as np
import keras
import serial
import threading
import re
from keras.models import model_from_json
import numpy
from numpy import array
from numpy import reshape
import queue
que = queue.Queue()
# How to connect gloves to fingers
# Start at pinky 1-5 green
# Start at thumb 1-5 red
BYTES_TO_READ = 23
#HC-06(RED)
ser0 = serial.Serial("COM9", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE, rtscts=1)
#RN(GREEN)
#Changed stopbits to 1
ser1 = serial.Serial("COM10", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE,rtscts=1)
dataset = pd.read_csv('directory')
X = dataset.iloc[:, 1:11].values
y = dataset.iloc[:, 0].values
# Taking care of missing data
from sklearn.preprocessing import Imputer
imputer = Imputer(missing_values = 'NaN', strategy = 'mean', axis = 0)
imputer = imputer.fit(X[:, 1:11])
X[:, 1:11] = imputer.transform(X[:, 1:11])
# dataset starts at 1 due to columns from openoffice formatting, I suppose
# Encoding categorical data
# Encoding the Dependent Variable
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
labelencoder_y = LabelEncoder()
y = labelencoder_y.fit_transform(y)
# May need to only go this far in the encoding
onehotencoder = OneHotEncoder(sparse=False)
y = y.reshape(len(y), 1)
y = onehotencoder.fit_transform(y)
# Splitting the dataset into the Training set and Test set
from sklearn.model_selection import train_test_split
# Consider using another random_state seed when training the next time
# Changed random_state to 42
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)
# Feature Scaling
from sklearn.preprocessing import StandardScaler
sc_X = StandardScaler()
X_train = sc_X.fit_transform(X_train)
X_test = sc_X.transform(X_test)
# Importin keras dependencies
import keras
from keras.models import Sequential
from keras.layers import Dense
from keras.models import model_from_json
# Initializing the ANN
classifier = Sequential()
# Add 1st input hidden layer
classifier.add(Dense(17,kernel_initializer='uniform',activation='relu',input_dim = 10))
# Add second hidden layer
classifier.add(Dense(17,kernel_initializer='uniform',activation='relu'))
# Add the output layer
classifier.add(Dense(28,kernel_initializer='uniform',activation='softmax'))
classifier.compile(loss="categorical_crossentropy", optimizer="adam", metrics=['accuracy'])
# load json and create model
#json_file = open('model.json', 'r')
#loaded_model_json = json_file.read()
#json_file.close()
#loaded_model = model_from_json(loaded_model_json)
# load weights into new model
classifier.load_weights("directory")
print("Loaded model from disk")
# load json and create model
#json_file = open('directory', 'r')
#loaded_model_json = json_file.read()
#json_file.close()
#classifier = model_from_json(loaded_model_json)
#print("Loaded model from disk")
#classifier.fit(X_train,y_train,batch_size=10,nb_epoch=1000)
graph = tf.get_default_graph()
# serialize model to JSON
#model_json = classifier.to_json()
#with open("directory", "w") as json_file:
# json_file.write(model_json)
# serialize weights to HDF5
#classifier.save_weights("directory")
#print("Saved model to disk")
#BEGINNING OF ADDED CODE
# Class to do the important procedures on the serial data
class serprocedure():
# The following variables are class-level and are static, which means they will persist across class instances
try:
sensorlist
except NameError:
sensorlist = []
# A list to store the missing data
try:
missingdata
except NameError:
missingdata = []
# A counter for missing data
try:
mcounter
except NameError:
mcounter = 0
try:
times_counter
except NameError:
times_counter = 0
# Use the __init__ constructor to take argument self and serdata
# Each method should only do 1 thing
def __init__(self,serdata,flag):
self.serdata = serdata
self.flag = flag
self.serdatalooper(self.flag)
# If it is the second thread with the second serial com, and missing counter less than 1, and sensorlist not greater than 10
if self.flag == 1 and serprocedure.mcounter < 1 and len(serprocedure.sensorlist) == 10:
# Tell the user that the program is performing further extraction
print("Performing further extraction of data and exportation")
# Perform further extraction
self.furtherextraction(serprocedure.sensorlist)
serprocedure.times_counter = serprocedure.times_counter + 1
# It will only do this, if it's not the first serial COM with flag 0
elif self.flag != 0:
# Reset counter
serprocedure.mcounter = 0
# Clear the list so it doesn't build up
serprocedure.sensorlist.clear()
print("Throwing away partial or excess result.")
que.put(False)
# Method to extract the individual parts of the serial data
def extractanalog(self,analognumber,flag):
# Changed the decimal regexp to {1,2} quantifier
found = re.search(r'(A' + str(analognumber) + '\d{1,2})',str(self.serdata))
if found is not None:
if found.group():
# Create a list of data
# sensorlist must be moved to the top to be a class-level variable
#sensorlist = []
serprocedure.sensorlist.append(str(found.group()))
return
else:
serprocedure.mcounter += 1
# It's getting stuck here
return
def furtherextraction(self,newlist):
# A list to hold analog labels
findanaloglabel = []
# A list to hold analog values
findanalogvalue = []
z = 0
#print("This is the list in furtherextraction:")
#print(newlist)
# Len counts 10 elements in the list but the index starts at 0
while z < len(newlist):
# These will have to be made into lists
findanaloglabel.append(re.search(r'(A\d)',newlist[z]).group())
# ?<= looks behind and only matches whats ahead
# Changed the decimal regexp to {1,2} quantifier
findanalogvalue.append(int(re.search(r'((?<=A\d{1})\d{1,2})',newlist[z]).group()))
# Increment z
z = z + 1
# print the list findanalogvalue
print(findanalogvalue)
# Call the export method
serprocedure.sensorlist.clear()
# Return the analog values to main read_analog
que.put(findanalogvalue)
def serdatalooper(self,flag):
if flag == 0:
i = 0
end = i + 4
else:
i = 5
end = i + 4
# Loop through the entire length of the list and extract the data
while i <= end:
self.extractanalog(i,"mainlist")
# Increment the counter
i = i + 1
# Sort the list
serprocedure.sensorlist.sort()
#if len(serprocedure.missingdata) < 1:
#q.put("There were no missing data")
#return True
#else:
#q.put("There are " + str(len(serprocedure.missingdata)) + " of data missing from list")
#return False
# read from serial port
def read_from_serial(serial,board,flag):
#print("reading from {}: port {}".format(board, port))
payload = b''
bytes_count = 0
# Changed the 1 in serial.read(1) to bytes_at_a_time
#bytes_at_a_time = serial.in_waiting
# Changed to <= to make it count the last byte
#while bytes_count <= BYTES_TO_READ:
#read_bytes = serial.read(1)
# sum number of bytes returned (not 2), you have set the timeout on serial port
# see https://pythonhosted.org/pyserial/pyserial_api.html#serial.Serial.read
# bytes_count = bytes_count + len(read_bytes)
# This seems to be an improvement. Try to catch missing serial data.
read_bytes = serial.read_until('\n',40)
payload = payload + read_bytes
# here you have the bytes, do your logic
# Instantiate object from serprocedure class
serprocobj = serprocedure(payload,flag)
def main():
while True:
# For some reason it's only updating every other go
# Maybe to the keylogging here, and pass the key as a value to the thread
# THE FIRST GO THROUGH SEEMS TO BE EXACTLY THE SAME AS THE SECOND GO THROUGH
# Pass in the function, serial, board, and key as agrguments
# We'll pass in a flag to identify which board is being used
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)",0))
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)",1))
# Start the threads
t.start()
t1.start()
# Be careful this is blocking. Gets the missing data amount
#print(q.get())
# wait for all threads termination
# The joins may be holding up the buffer flushes, if they are move them to the bottom
#t.join()
#t1.join()
# Flush the serial input and output buffers
ser0.reset_input_buffer()
ser0.reset_output_buffer()
ser1.reset_input_buffer()
ser1.reset_output_buffer()
t.join()
t1.join()
global graph
with graph.as_default():
analog_to_predict = que.get()
if analog_to_predict != False:
# The input had to be scaled in order for it to work somewhat better
analog_to_predict = sc_X.transform(numpy.array([analog_to_predict]))
new_pred = classifier.predict_classes(analog_to_predict)
new_pred = labelencoder_y.inverse_transform(new_pred)
print(new_pred)
else:
print("que.get() was False")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment