Last active
August 24, 2020 04:16
-
-
Save brocksprogramming/fa9bacb654e418793f67088cc3ffbbf3 to your computer and use it in GitHub Desktop.
MLGlovesMLModel1.1.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This software was made in hopes of pioneering machine learning gloves. The way the program is setup, the main application is for typing on any hard surface without a keyboard. | |
Copyright (C) 2020 Brock Sterling Lynch | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <https://www.gnu.org/licenses/>. | |
You may reach the author of this software at brocksprogramming@gmail.com | |
''' | |
# Tensorflow v1.5 was used in this project | |
import pandas as pd | |
import tensorflow as tf | |
import numpy as np | |
import keras | |
import serial | |
import threading | |
import re | |
from keras.models import model_from_json | |
import numpy | |
from numpy import array | |
from numpy import reshape | |
import queue | |
que = queue.Queue() | |
# How to connect gloves to fingers | |
# Start at pinky 1-5 green | |
# Start at thumb 1-5 red | |
BYTES_TO_READ = 23 | |
#HC-06(RED) | |
ser0 = serial.Serial("COM9", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE, rtscts=1) | |
#RN(GREEN) | |
#Changed stopbits to 1 | |
ser1 = serial.Serial("COM10", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE,rtscts=1) | |
dataset = pd.read_csv('directory') | |
X = dataset.iloc[:, 1:11].values | |
y = dataset.iloc[:, 0].values | |
# Taking care of missing data | |
from sklearn.preprocessing import Imputer | |
imputer = Imputer(missing_values = 'NaN', strategy = 'mean', axis = 0) | |
imputer = imputer.fit(X[:, 1:11]) | |
X[:, 1:11] = imputer.transform(X[:, 1:11]) | |
# dataset starts at 1 due to columns from openoffice formatting, I suppose | |
# Encoding categorical data | |
# Encoding the Dependent Variable | |
from sklearn.preprocessing import LabelEncoder, OneHotEncoder | |
labelencoder_y = LabelEncoder() | |
y = labelencoder_y.fit_transform(y) | |
# May need to only go this far in the encoding | |
onehotencoder = OneHotEncoder(sparse=False) | |
y = y.reshape(len(y), 1) | |
y = onehotencoder.fit_transform(y) | |
# Splitting the dataset into the Training set and Test set | |
from sklearn.model_selection import train_test_split | |
# Consider using another random_state seed when training the next time | |
# Changed random_state to 42 | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42) | |
# Feature Scaling | |
from sklearn.preprocessing import StandardScaler | |
sc_X = StandardScaler() | |
X_train = sc_X.fit_transform(X_train) | |
X_test = sc_X.transform(X_test) | |
# Importin keras dependencies | |
import keras | |
from keras.models import Sequential | |
from keras.layers import Dense | |
from keras.models import model_from_json | |
# Initializing the ANN | |
classifier = Sequential() | |
# Add 1st input hidden layer | |
classifier.add(Dense(17,kernel_initializer='uniform',activation='relu',input_dim = 10)) | |
# Add second hidden layer | |
classifier.add(Dense(17,kernel_initializer='uniform',activation='relu')) | |
# Add the output layer | |
classifier.add(Dense(28,kernel_initializer='uniform',activation='softmax')) | |
classifier.compile(loss="categorical_crossentropy", optimizer="adam", metrics=['accuracy']) | |
# load json and create model | |
#json_file = open('model.json', 'r') | |
#loaded_model_json = json_file.read() | |
#json_file.close() | |
#loaded_model = model_from_json(loaded_model_json) | |
# load weights into new model | |
classifier.load_weights("directory") | |
print("Loaded model from disk") | |
# load json and create model | |
#json_file = open('directory', 'r') | |
#loaded_model_json = json_file.read() | |
#json_file.close() | |
#classifier = model_from_json(loaded_model_json) | |
#print("Loaded model from disk") | |
#classifier.fit(X_train,y_train,batch_size=10,nb_epoch=1000) | |
graph = tf.get_default_graph() | |
# serialize model to JSON | |
#model_json = classifier.to_json() | |
#with open("directory", "w") as json_file: | |
# json_file.write(model_json) | |
# serialize weights to HDF5 | |
#classifier.save_weights("directory") | |
#print("Saved model to disk") | |
#BEGINNING OF ADDED CODE | |
# Class to do the important procedures on the serial data | |
class serprocedure(): | |
# The following variables are class-level and are static, which means they will persist across class instances | |
try: | |
sensorlist | |
except NameError: | |
sensorlist = [] | |
# A list to store the missing data | |
try: | |
missingdata | |
except NameError: | |
missingdata = [] | |
# A counter for missing data | |
try: | |
mcounter | |
except NameError: | |
mcounter = 0 | |
try: | |
times_counter | |
except NameError: | |
times_counter = 0 | |
# Use the __init__ constructor to take argument self and serdata | |
# Each method should only do 1 thing | |
def __init__(self,serdata,flag): | |
self.serdata = serdata | |
self.flag = flag | |
self.serdatalooper(self.flag) | |
# If it is the second thread with the second serial com, and missing counter less than 1, and sensorlist not greater than 10 | |
if self.flag == 1 and serprocedure.mcounter < 1 and len(serprocedure.sensorlist) == 10: | |
# Tell the user that the program is performing further extraction | |
print("Performing further extraction of data and exportation") | |
# Perform further extraction | |
self.furtherextraction(serprocedure.sensorlist) | |
serprocedure.times_counter = serprocedure.times_counter + 1 | |
# It will only do this, if it's not the first serial COM with flag 0 | |
elif self.flag != 0: | |
# Reset counter | |
serprocedure.mcounter = 0 | |
# Clear the list so it doesn't build up | |
serprocedure.sensorlist.clear() | |
print("Throwing away partial or excess result.") | |
que.put(False) | |
# Method to extract the individual parts of the serial data | |
def extractanalog(self,analognumber,flag): | |
# Changed the decimal regexp to {1,2} quantifier | |
found = re.search(r'(A' + str(analognumber) + '\d{1,2})',str(self.serdata)) | |
if found is not None: | |
if found.group(): | |
# Create a list of data | |
# sensorlist must be moved to the top to be a class-level variable | |
#sensorlist = [] | |
serprocedure.sensorlist.append(str(found.group())) | |
return | |
else: | |
serprocedure.mcounter += 1 | |
# It's getting stuck here | |
return | |
def furtherextraction(self,newlist): | |
# A list to hold analog labels | |
findanaloglabel = [] | |
# A list to hold analog values | |
findanalogvalue = [] | |
z = 0 | |
#print("This is the list in furtherextraction:") | |
#print(newlist) | |
# Len counts 10 elements in the list but the index starts at 0 | |
while z < len(newlist): | |
# These will have to be made into lists | |
findanaloglabel.append(re.search(r'(A\d)',newlist[z]).group()) | |
# ?<= looks behind and only matches whats ahead | |
# Changed the decimal regexp to {1,2} quantifier | |
findanalogvalue.append(int(re.search(r'((?<=A\d{1})\d{1,2})',newlist[z]).group())) | |
# Increment z | |
z = z + 1 | |
# print the list findanalogvalue | |
print(findanalogvalue) | |
# Call the export method | |
serprocedure.sensorlist.clear() | |
# Return the analog values to main read_analog | |
que.put(findanalogvalue) | |
def serdatalooper(self,flag): | |
if flag == 0: | |
i = 0 | |
end = i + 4 | |
else: | |
i = 5 | |
end = i + 4 | |
# Loop through the entire length of the list and extract the data | |
while i <= end: | |
self.extractanalog(i,"mainlist") | |
# Increment the counter | |
i = i + 1 | |
# Sort the list | |
serprocedure.sensorlist.sort() | |
#if len(serprocedure.missingdata) < 1: | |
#q.put("There were no missing data") | |
#return True | |
#else: | |
#q.put("There are " + str(len(serprocedure.missingdata)) + " of data missing from list") | |
#return False | |
# read from serial port | |
def read_from_serial(serial,board,flag): | |
#print("reading from {}: port {}".format(board, port)) | |
payload = b'' | |
bytes_count = 0 | |
# Changed the 1 in serial.read(1) to bytes_at_a_time | |
#bytes_at_a_time = serial.in_waiting | |
# Changed to <= to make it count the last byte | |
#while bytes_count <= BYTES_TO_READ: | |
#read_bytes = serial.read(1) | |
# sum number of bytes returned (not 2), you have set the timeout on serial port | |
# see https://pythonhosted.org/pyserial/pyserial_api.html#serial.Serial.read | |
# bytes_count = bytes_count + len(read_bytes) | |
# This seems to be an improvement. Try to catch missing serial data. | |
read_bytes = serial.read_until('\n',40) | |
payload = payload + read_bytes | |
# here you have the bytes, do your logic | |
# Instantiate object from serprocedure class | |
serprocobj = serprocedure(payload,flag) | |
def main(): | |
while True: | |
# For some reason it's only updating every other go | |
# Maybe to the keylogging here, and pass the key as a value to the thread | |
# THE FIRST GO THROUGH SEEMS TO BE EXACTLY THE SAME AS THE SECOND GO THROUGH | |
# Pass in the function, serial, board, and key as agrguments | |
# We'll pass in a flag to identify which board is being used | |
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)",0)) | |
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)",1)) | |
# Start the threads | |
t.start() | |
t1.start() | |
# Be careful this is blocking. Gets the missing data amount | |
#print(q.get()) | |
# wait for all threads termination | |
# The joins may be holding up the buffer flushes, if they are move them to the bottom | |
#t.join() | |
#t1.join() | |
# Flush the serial input and output buffers | |
ser0.reset_input_buffer() | |
ser0.reset_output_buffer() | |
ser1.reset_input_buffer() | |
ser1.reset_output_buffer() | |
t.join() | |
t1.join() | |
global graph | |
with graph.as_default(): | |
analog_to_predict = que.get() | |
if analog_to_predict != False: | |
# The input had to be scaled in order for it to work somewhat better | |
analog_to_predict = sc_X.transform(numpy.array([analog_to_predict])) | |
new_pred = classifier.predict_classes(analog_to_predict) | |
new_pred = labelencoder_y.inverse_transform(new_pred) | |
print(new_pred) | |
else: | |
print("que.get() was False") | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment