Skip to content

Instantly share code, notes, and snippets.

@brocksprogramming
Last active August 24, 2020 04:12
Show Gist options
  • Save brocksprogramming/bc3b156738a35bd0d99dd843790af70c to your computer and use it in GitHub Desktop.
Save brocksprogramming/bc3b156738a35bd0d99dd843790af70c to your computer and use it in GitHub Desktop.
Get serial data from analog sensors on Arduino Lilypad. Create serial com ports and siphen data through RegExp in Python. Emptying the serial data buffers is pivotal to this working. Also, be careful where you open the serial connection.
'''
This software was made in hopes of pioneering machine learning gloves. The way the program is setup, the main application is for typing on any hard surface without a keyboard.
Copyright (C) 2020 Brock Sterling Lynch
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
You may reach the author of this software at brocksprogramming@gmail.com
'''
import serial
import re
import threading
import msvcrt
import csv
# This was changed to account for the newly added analog sensors on each hand.
# Setting it to 23 bytes was cutting off the analog sensor #9
#HC-06(RED)
ser0 = serial.Serial("COM9", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE, rtscts=1)
#RN(GREEN)
ser1 = serial.Serial("COM10", 1200,bytesize=serial.EIGHTBITS,timeout=None, parity=serial.PARITY_NONE, rtscts=1)
# Class to do the important procedures on the serial data
class serprocedure():
# The following variables are class-level and are static, which means they will persist across class instances
try:
sensorlist
except NameError:
sensorlist = []
# A list to store the missing data
try:
missingdata
except NameError:
missingdata = []
# A counter for missing data
try:
mcounter
except NameError:
mcounter = 0
# Boolean to store true false as to whether serdatalooper gets all data on the first run
try:
First_Try
except NameError:
First_Try = False
try:
times_counter
except NameError:
times_counter = 0
# Use the __init__ constructor to take argument self and serdata
# Each method should only do 1 thing
# Use the __init__ constructor to take argument self and serdata
# Each method should only do 1 thing
def __init__(self,serdata,flag,key):
self.serdata = serdata
self.flag = flag
self.serdatalooper(self.flag)
self.key = key
# If it is the second thread with the second serial com, and missing counter less than 1, and sensorlist not greater than 10
if self.flag == 1 and serprocedure.mcounter < 1 and len(serprocedure.sensorlist) == 10:
# Tell the user that the program is performing further extraction
print("Performing further extraction of data and exportation")
# Perform further extraction
self.furtherextraction(serprocedure.sensorlist)
serprocedure.times_counter = serprocedure.times_counter + 1
# This tells how many times the user has typed
print("You've typed " + str(serprocedure.times_counter) + " Time(s)" ". You just typed the letter " + str(self.key))
elif self.flag != 0:
# Reset counter
serprocedure.mcounter = 0
# Clear the list so it doesn't build up
serprocedure.sensorlist.clear()
print("Throwing away partial result.")
# Check if there is missing data, if there is loop through that. Otherwise loop through the data normally.
# Method to extract the individual parts of the serial data
def extractanalog(self,analognumber,flag):
# Changed the decimal regexp to {1,2} quantifier
found = re.search(r'(A' + str(analognumber) + '\d{1,2})',str(self.serdata))
if found is not None:
if found.group():
# Create a list of data
# sensorlist must be moved to the top to be a class-level variable
#sensorlist = []
serprocedure.sensorlist.append(str(found.group()))
return
else:
serprocedure.mcounter += 1
# It's getting stuck here
return
def furtherextraction(self,newlist):
# A list to hold analog labels
findanaloglabel = []
# A list to hold analog values
findanalogvalue = []
z = 0
print("This is the list in furtherextraction:")
print(newlist)
# Len counts 10 elements in the list but the index starts at 0
while z < len(newlist):
# These will have to be made into lists
findanaloglabel.append(re.search(r'(A\d)',newlist[z]).group())
# ?<= looks behind and only matches whats ahead
# Changed the decimal regexp to {1,2} quantifier
findanalogvalue.append(re.search(r'((?<=A\d{1})\d{1,2})',newlist[z]).group())
# Increment z
z = z + 1
# Call the export method
self.exporttocsv(findanaloglabel,findanalogvalue)
# Export to excel form
def exporttocsv(self,labels,values):
# Insert key value into list
values.insert(0,self.key)
with open('directory','a',newline='') as csvalues:
fhandle = csv.writer(csvalues)
# Export the row to csv file
fhandle.writerow(values)
print("Done exporting values to csv")
if self.flag == 1:
serprocedure.sensorlist.clear()
def serdatalooper(self,flag):
if flag == 0:
i = 0
end = i + 4
else:
i = 5
end = i + 4
# Loop through the entire length of the list and extract the data
while i <= end:
self.extractanalog(i,"mainlist")
# Increment the counter
i = i + 1
# Sort the list
serprocedure.sensorlist.sort()
#if len(serprocedure.missingdata) < 1:
#q.put("There were no missing data")
#return True
#else:
#q.put("There are " + str(len(serprocedure.missingdata)) + " of data missing from list")
#return False
# read from serial port
def read_from_serial(serial,board,key,flag):
#print("reading from {}: port {}".format(board, port))
payload = b''
#bytes_count = 0
# CHANGED BYTES_TO_READ to inWaiting implementation
#inwaitingbytes = serial.in_waiting()
# Changed to <= to make it count the last byte
#while bytes_count <= inwaitingbytes:
#read_bytes = serial.read(1)
# sum number of bytes returned (not 2), you have set the timeout on serial port
# see https://pythonhosted.org/pyserial/pyserial_api.html#serial.Serial.read
#bytes_count = bytes_count + len(read_bytes)
read_bytes = serial.read_until("\n",30)
payload = payload + read_bytes
# here you have the bytes, do your logic
# Instantiate object from serprocedure class
serprocobj = serprocedure(payload,flag,key)
# If the property THREE_TIMES_PROPERTY is greater than
#print("READ from {}: [{}]".format(board,payload))
return
def counter():
if 'cnt' not in counter.__dict__:
counter.cnt = 0
else:
counter.cnt += 1
return counter.cnt
def main():
while True:
# For some reason it's only updating every other go
# Maybe to the keylogging here, and pass the key as a value to the thread
# THE FIRST GO THROUGH SEEMS TO BE EXACTLY THE SAME AS THE SECOND GO THROUGH
# This will be the response of 1, 2, or 3
if 'rsp' not in main.__dict__:
# Ask what the user wants to train on
print("There are three options. Press 1 to train on alphabetic keys a-z, press 2 to train on space key, and press 3 to train on resting home-row position.")
main.rsp = input("What would you like to do? ")
# Depending on the answer received train on the given type
if main.rsp == '1':
if 'cnt' not in main.__dict__:
main.cnt = 0
# Request that the user type each letter 20 times for the machine learning dataset to train on
main.ready = input("Please type each letter of the alphabet on the keyboard 20 times. Type y when ready and hit enter. ")
elif main.ready == 'y':
main.cnt += 1
key = msvcrt.getch()
if key and counter() <= 520:
# Pass in the function, serial, board, and key as agrguments
# We'll pass in a flag to identify which board is being used
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)",key.decode('ASCII'),0))
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)",key.decode('ASCII'),1))
# Start the threads
t.start()
t1.start()
# Be careful this is blocking. Gets the missing data amount
#print(q.get())
# wait for all threads termination
# The joins may be holding up the buffer flushes, if they are move them to the bottom
# Flush the serial input and output buffers
ser0.reset_input_buffer()
ser0.reset_output_buffer()
ser1.reset_input_buffer()
ser1.reset_output_buffer()
t.join()
t1.join()
else:
break
elif main.rsp == '2':
if 'cnt' not in main.__dict__:
main.cnt = 0
# Request permission to record homerow entries
main.ready = input("Please place fingers in homerow positions and type space 20 times. Type j and hit enter. ")
elif main.ready == 'j':
main.cnt += 1
# We don't need to record the key this time.
if counter() <= 20:
# Pass in the function, serial, board, and key as agrguments
# We'll pass in a flag to identify which board is being used
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)","homerow",0))
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)","homerow",1))
# Start the threads
t.start()
t1.start()
# Be careful this is blocking. Gets the missing data amount
#print(q.get())
# wait for all threads termination
# The joins may be holding up the buffer flushes, if they are move them to the bottom
# Flush the serial input and output buffers
ser0.reset_input_buffer()
ser0.reset_output_buffer()
ser1.reset_input_buffer()
ser1.reset_output_buffer()
t.join()
t1.join()
else:
break
elif main.rsp == '3':
if 'cnt' not in main.__dict__:
main.cnt = 0
# Request the user record space 20 times
main.ready = input("This step requires you to type the space key 20 times. To begin, press the y key and hit enter")
elif main.ready == 'y':
main.cnt += 1
key = msvcrt.getch()
# If ready equals y(yes) and counter is less than or equal to 20, else break
if key and counter() <= 20:
# Pass in the function, serial, board, and key as agrguments
# We'll pass in a flag to identify which board is being used
t = threading.Thread(target=read_from_serial, args=(ser0,"HC-06(Red)","space",0))
t1 = threading.Thread(target=read_from_serial, args=(ser1,"RN(Green)","space",1))
# Start the threads
t.start()
t1.start()
# Be careful this is blocking. Gets the missing data amount
#print(q.get())
# wait for all threads termination
# The joins may be holding up the buffer flushes, if they are, move them to the bottom
# Flush the serial input and output buffers
ser0.reset_input_buffer()
ser0.reset_output_buffer()
ser1.reset_input_buffer()
ser1.reset_output_buffer()
t.join()
t1.join()
else:
break
print("Thank you for training using this ML Gloves program")
# Added serial close to the program
ser0.close()
ser1.close()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment