Skip to content

Instantly share code, notes, and snippets.

@dpnkrg
Created September 19, 2018 10:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dpnkrg/369e26c51575aad7bd4a1f9ccbc7a1a3 to your computer and use it in GitHub Desktop.
Save dpnkrg/369e26c51575aad7bd4a1f9ccbc7a1a3 to your computer and use it in GitHub Desktop.
"""
This is an initial implementation of ReadSerial.
This version is directly implemented from github project.
Implemented as per initial requirements.
TODO :- TESTING
Working : -
1. Advanced version from Serial_v.0.1
1. File name is generated with current date time stamp.
2. Before taking Serial Data. Test generated values taken from user.
3. Code clean up of mat-plot
4. Added sleep timer
Changelog : -
1. In file name seconds added
2. OS type detection added
"""
import serial
import csv
import re
import time
from time import sleep
import os
import redis
import uuid
"""
Setup Variables
"""
filename = "MAT" + "_" + time.strftime("%Y-%m-%d_%H.%M.%S") + ".csv"
baud = 115200 # Must match Arduino baud rate
timeout = 4 # Seconds // must be sync device time clock
max_num_readings = 16000
num_signals = 1
mylist = list()
mylist1 = list()
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# weight = None
# tlayer = None
# blayer = None
# pires = None
# cfoam = None
# ncfoam = None
if os.name == 'posix':
portPath = "/dev/tty.usbmodem1421" # or "/dev/tty.usbmodem1421"
elif os.name == 'nt':
portPath = "COM6" # or "COM7"
elif os.name == 'linux' or 'linux2':
portPath = ""
def create_serial_obj(portPath, baud_rate, tout):
"""
Given the port path, baud rate, and timeout value, creates
and returns a pyserial object.
"""
return serial.Serial(portPath, baud_rate, timeout=tout)
def read_serial_data(serial):
"""
Given a pyserial object (serial). Outputs a list of lines read in
from the serial port
"""
serial.flushInput()
# serial.dtr(True)
serial_data = []
readings_left = True
timeout_reached = False
while readings_left and not timeout_reached:
serial_line = serial.readline()
if serial_line == '':
timeout_reached = True
else:
serial_data.append(serial_line)
if len(serial_data) == max_num_readings:
readings_left = False
return serial_data
def is_number(string):
"""
Given a string returns True if the string represents a number.
Returns False otherwise.
"""
try:
float(string)
return True
except ValueError:
return False
def clean_serial_data(data):
"""
Given a list of serial lines (data). Removes all characters.
Returns the cleaned list of lists of digits.
Given something like: ['0.5000,33\r\n', '1.0000,283\r\n']
Returns: [[0.5,33.0], [1.0,283.0]]
"""
clean_data = []
for line in data:
line_data = re.findall("\d*\.\d*|\d*", line) # Find all digits
line_data = [float(element) for element in line_data if is_number(element)] # Convert strings to float
if len(line_data) >= 2:
clean_data.append(line_data)
print('clean data')
# r.hmset('user',)
# r.hmset(user:1 username tutorialspoint password tutorialspoint points 200)
print(clean_data)
return clean_data
def save_to_csv(data, filename):
"""
Saves a list of lists (data) to filename
"""
with open(filename, 'wb') as csvfile:
csvwrite = csv.writer(csvfile)
# csvfile.write(weight)
# csvfile.write('\n')
# csvfile.write(tlayer)
# csvfile.write('\n')
# csvfile.write(blayer)
# csvfile.write('\n')
# csvfile.write(pires)
# csvfile.write('\n')
# csvfile.write(cfoam)
# csvfile.write('\n')
# csvfile.write(ncfoam)
csvfile.write(str(mylist))
csvfile.write('\n\n')
csvwrite.writerows(data)
# print("deep ", data)
def read_serial():
print "Creating serial object..."
serial_obj = create_serial_obj(portPath, baud, timeout)
print "Reading serial data..."
serial_data = read_serial_data(serial_obj)
print "serial data"
print(serial_data)
print "Data Length"
print len(serial_data)
print "Cleaning data..."
clean_data = clean_serial_data(serial_data)
print "Saving to csv..."
save_to_csv(clean_data, filename)
print "Save to redis"
user = {"Name": "Pradeep", "Company": "SCTL", "Address": "Mumbai", "Location": "RCP"}
print("name :",mylist1)
# user_id = str(g.db.incrby('next_user_id', 1000))
public_id = str(uuid.uuid1()),
values = {"uuid": public_id, "device_id": "raspberrypi", "client_id": "arduino", "Material": mylist, "Time": time.strftime("%Y-%m-%d_%H.%M.%S"), "RawData": clean_data}
r.hmset("data", values)
print "End..."
def main():
wval = str(raw_input('Enter Weight Value : '))
weight = 'Weight_' + wval
tlval = str(raw_input('Enter Top Layer : '))
tlayer = 'TL_' + tlval
blval = str(raw_input('Enter Bottom Layer : '))
blayer = 'BL_' + blval
prval = str(raw_input('Enter Piso Resistive : '))
pires = 'PR_' + prval
cfom = str(raw_input('Enter Conductive Foam : '))
cfoam = 'CF_' + cfom
ncfom = str(raw_input('Enter Non Conductive Foam : '))
ncfoam = 'NCF_' + ncfom
mylist.append(weight)
mylist.append(tlayer)
mylist.append(blayer)
mylist.append(pires)
mylist.append(cfoam)
mylist.append(ncfoam)
print "Processing..."
sleep(3)
read_serial()
if __name__ == "__main__":
# read_serial()
main()
@dpnkrg
Copy link
Author

dpnkrg commented Sep 19, 2018

i

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment