Last active
August 24, 2020 04:12
-
-
Save brocksprogramming/bc3b156738a35bd0d99dd843790af70c to your computer and use it in GitHub Desktop.
Get serial data from analog sensors on Arduino Lilypad. Create serial com ports and siphen data through RegExp in Python. Emptying the serial data buffers is pivotal to this working. Also, be careful where you open the serial connection.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This software was made in hopes of pioneering machine learning gloves. The way the program is setup, the main application is for typing on any hard surface without a keyboard. | |
Copyright (C) 2020 Brock Sterling Lynch | |
This program is free software: you can redistribute it and/or modify | |
it under the terms of the GNU General Public License as published by | |
the Free Software Foundation, either version 3 of the License, or | |
(at your option) any later version. | |
This program is distributed in the hope that it will be useful, | |
but WITHOUT ANY WARRANTY; without even the implied warranty of | |
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
GNU General Public License for more details. | |
You should have received a copy of the GNU General Public License | |
along with this program. If not, see <https://www.gnu.org/licenses/>. | |
You may reach the author of this software at brocksprogramming@gmail.com | |
''' | |
import serial | |
import re | |
import threading | |
import msvcrt | |
import csv | |
# This was changed to account for the newly added analog sensors on each hand. | |
# Setting it to 23 bytes was cutting off the analog sensor #9 | |
#HC-06(RED) | |
ser0 = serial.Serial("COM9", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE, rtscts=1) | |
#RN(GREEN) | |
ser1 = serial.Serial("COM10", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE, rtscts=1) | |
# Class to do the important procedures on the serial data | |
class serprocedure(): | |
# The following variables are class-level and are static, which means they will persist across class instances | |
try: | |
sensorlist | |
except NameError: | |
sensorlist = [] | |
# A list to store the missing data | |
try: | |
missingdata | |
except NameError: | |
missingdata = [] | |
# A counter for missing data | |
try: | |
mcounter | |
except NameError: | |
mcounter = 0 | |
# Boolean to store true false as to whether serdatalooper gets all data on the first run | |
try: | |
First_Try | |
except NameError: | |
First_Try = False | |
try: | |
times_counter | |
except NameError: | |
times_counter = 0 | |
# Use the __init__ constructor to take argument self and serdata | |
# Each method should only do 1 thing | |
# Use the __init__ constructor to take argument self and serdata | |
# Each method should only do 1 thing | |
def __init__(self,serdata,flag,key): | |
self.serdata = serdata | |
self.flag = flag | |
self.serdatalooper(self.flag) | |
self.key = key | |
# If it is the second thread with the second serial com, and missing counter less than 1, and sensorlist not greater than 10 | |
if self.flag == 1 and serprocedure.mcounter < 1 and len(serprocedure.sensorlist) == 10: | |
# Tell the user that the program is performing further extraction | |
print("Performing further extraction of data and exportation") | |
# Perform further extraction | |
self.furtherextraction(serprocedure.sensorlist) | |
serprocedure.times_counter = serprocedure.times_counter + 1 | |
# This tells how many times the user has typed | |
print("You've typed " + str(serprocedure.times_counter) + " Time(s)" ". You just typed the letter " + str(self.key)) | |
elif self.flag != 0: | |
# Reset counter | |
serprocedure.mcounter = 0 | |
# Clear the list so it doesn't build up | |
serprocedure.sensorlist.clear() | |
print("Throwing away partial result.") | |
# Check if there is missing data, if there is loop through that. Otherwise loop through the data normally. | |
# Method to extract the individual parts of the serial data | |
def extractanalog(self,analognumber,flag): | |
# Changed the decimal regexp to {1,2} quantifier | |
found = re.search(r'(A' + str(analognumber) + '\d{1,2})',str(self.serdata)) | |
if found is not None: | |
if found.group(): | |
# Create a list of data | |
# sensorlist must be moved to the top to be a class-level variable | |
#sensorlist = [] | |
serprocedure.sensorlist.append(str(found.group())) | |
return | |
else: | |
serprocedure.mcounter += 1 | |
# It's getting stuck here | |
return | |
def furtherextraction(self,newlist): | |
# A list to hold analog labels | |
findanaloglabel = [] | |
# A list to hold analog values | |
findanalogvalue = [] | |
z = 0 | |
print("This is the list in furtherextraction:") | |
print(newlist) | |
# Len counts 10 elements in the list but the index starts at 0 | |
while z < len(newlist): | |
# These will have to be made into lists | |
findanaloglabel.append(re.search(r'(A\d)',newlist[z]).group()) | |
# ?<= looks behind and only matches whats ahead | |
# Changed the decimal regexp to {1,2} quantifier | |
findanalogvalue.append(re.search(r'((?<=A\d{1})\d{1,2})',newlist[z]).group()) | |
# Increment z | |
z = z + 1 | |
# Call the export method | |
self.exporttocsv(findanaloglabel,findanalogvalue) | |
# Export to excel form | |
def exporttocsv(self,labels,values): | |
# Insert key value into list | |
values.insert(0,self.key) | |
with open('directory','a',newline='') as csvalues: | |
fhandle = csv.writer(csvalues) | |
# Export the row to csv file | |
fhandle.writerow(values) | |
print("Done exporting values to csv") | |
if self.flag == 1: | |
serprocedure.sensorlist.clear() | |
def serdatalooper(self,flag): | |
if flag == 0: | |
i = 0 | |
end = i + 4 | |
else: | |
i = 5 | |
end = i + 4 | |
# Loop through the entire length of the list and extract the data | |
while i <= end: | |
self.extractanalog(i,"mainlist") | |
# Increment the counter | |
i = i + 1 | |
# Sort the list | |
serprocedure.sensorlist.sort() | |
#if len(serprocedure.missingdata) < 1: | |
#q.put("There were no missing data") | |
#return True | |
#else: | |
#q.put("There are " + str(len(serprocedure.missingdata)) + " of data missing from list") | |
#return False | |
# read from serial port | |
def read_from_serial(serial,board,key,flag): | |
#print("reading from {}: port {}".format(board, port)) | |
payload = b'' | |
#bytes_count = 0 | |
# CHANGED BYTES_TO_READ to inWaiting implementation | |
#inwaitingbytes = serial.in_waiting() | |
# Changed to <= to make it count the last byte | |
#while bytes_count <= inwaitingbytes: | |
#read_bytes = serial.read(1) | |
# sum number of bytes returned (not 2), you have set the timeout on serial port | |
# see https://pythonhosted.org/pyserial/pyserial_api.html#serial.Serial.read | |
#bytes_count = bytes_count + len(read_bytes) | |
read_bytes = serial.read_until("\n",30) | |
payload = payload + read_bytes | |
# here you have the bytes, do your logic | |
# Instantiate object from serprocedure class | |
serprocobj = serprocedure(payload,flag,key) | |
# If the property THREE_TIMES_PROPERTY is greater than | |
#print("READ from {}: [{}]".format(board,payload)) | |
return | |
def counter(): | |
if 'cnt' not in counter.__dict__: | |
counter.cnt = 0 | |
else: | |
counter.cnt += 1 | |
return counter.cnt | |
def main(): | |
while True: | |
# For some reason it's only updating every other go | |
# Maybe to the keylogging here, and pass the key as a value to the thread | |
# THE FIRST GO THROUGH SEEMS TO BE EXACTLY THE SAME AS THE SECOND GO THROUGH | |
# This will be the response of 1, 2, or 3 | |
if 'rsp' not in main.__dict__: | |
# Ask what the user wants to train on | |
print("There are three options. Press 1 to train on alphabetic keys a-z, press 2 to train on space key, and press 3 to train on resting home-row position.") | |
main.rsp = input("What would you like to do? ") | |
# Depending on the answer received train on the given type | |
if main.rsp == '1': | |
if 'cnt' not in main.__dict__: | |
main.cnt = 0 | |
# Request that the user type each letter 20 times for the machine learning dataset to train on | |
main.ready = input("Please type each letter of the alphabet on the keyboard 20 times. Type y when ready and hit enter. ") | |
elif main.ready == 'y': | |
main.cnt += 1 | |
key = msvcrt.getch() | |
if key and counter() <= 520: | |
# Pass in the function, serial, board, and key as agrguments | |
# We'll pass in a flag to identify which board is being used | |
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)",key.decode('ASCII'),0)) | |
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)",key.decode('ASCII'),1)) | |
# Start the threads | |
t.start() | |
t1.start() | |
# Be careful this is blocking. Gets the missing data amount | |
#print(q.get()) | |
# wait for all threads termination | |
# The joins may be holding up the buffer flushes, if they are move them to the bottom | |
# Flush the serial input and output buffers | |
ser0.reset_input_buffer() | |
ser0.reset_output_buffer() | |
ser1.reset_input_buffer() | |
ser1.reset_output_buffer() | |
t.join() | |
t1.join() | |
else: | |
break | |
elif main.rsp == '2': | |
if 'cnt' not in main.__dict__: | |
main.cnt = 0 | |
# Request permission to record homerow entries | |
main.ready = input("Please place fingers in homerow positions and type space 20 times. Type j and hit enter. ") | |
elif main.ready == 'j': | |
main.cnt += 1 | |
# We don't need to record the key this time. | |
if counter() <= 20: | |
# Pass in the function, serial, board, and key as agrguments | |
# We'll pass in a flag to identify which board is being used | |
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)","homerow",0)) | |
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)","homerow",1)) | |
# Start the threads | |
t.start() | |
t1.start() | |
# Be careful this is blocking. Gets the missing data amount | |
#print(q.get()) | |
# wait for all threads termination | |
# The joins may be holding up the buffer flushes, if they are move them to the bottom | |
# Flush the serial input and output buffers | |
ser0.reset_input_buffer() | |
ser0.reset_output_buffer() | |
ser1.reset_input_buffer() | |
ser1.reset_output_buffer() | |
t.join() | |
t1.join() | |
else: | |
break | |
elif main.rsp == '3': | |
if 'cnt' not in main.__dict__: | |
main.cnt = 0 | |
# Request the user record space 20 times | |
main.ready = input("This step requires you to type the space key 20 times. To begin, press the y key and hit enter") | |
elif main.ready == 'y': | |
main.cnt += 1 | |
key = msvcrt.getch() | |
# If ready equals y(yes) and counter is less than or equal to 20, else break | |
if key and counter() <= 20: | |
# Pass in the function, serial, board, and key as agrguments | |
# We'll pass in a flag to identify which board is being used | |
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)","space",0)) | |
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)","space",1)) | |
# Start the threads | |
t.start() | |
t1.start() | |
# Be careful this is blocking. Gets the missing data amount | |
#print(q.get()) | |
# wait for all threads termination | |
# The joins may be holding up the buffer flushes, if they are, move them to the bottom | |
# Flush the serial input and output buffers | |
ser0.reset_input_buffer() | |
ser0.reset_output_buffer() | |
ser1.reset_input_buffer() | |
ser1.reset_output_buffer() | |
t.join() | |
t1.join() | |
else: | |
break | |
print("Thank you for training using this ML Gloves program") | |
# Added serial close to the program | |
ser0.close() | |
ser1.close() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment