Created
May 30, 2022 20:46
-
-
Save erikbeall/81809aa1b68986c8e6ea505b520d3f62 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import sys | |
import numpy as np | |
import time | |
import os | |
import rospy | |
import serial | |
class monitor_node(): | |
def timer_fast_callback(self, timer): | |
# data should begin with 'SY' and end with 'TC' | |
data = b'' | |
while True: #self.mmw_comms.inWaiting() > 0: | |
data += self.mmw_comms.read(self.mmw_comms.inWaiting()) | |
if data[-2:]==b'TC': | |
break | |
if data[:2]!=b'SY' or data[-2:]!=b'TC': | |
print('framesync error, dropping frame') | |
return | |
control_chr = int(str(data[2])) | |
command_chr = int(str(data[3])) | |
lendata = int(str(data[4]))*256 + int(str(data[5])) | |
chksum = int(str(data[6+lendata])) | |
# verify checksum | |
chk=data[2] | |
#for d in data[3:-2]: | |
chk=data[0] | |
for d in data[1:]: | |
chk = chk^int(str(d)) | |
if control_chr == 0x01: | |
print('pack id: ', str(data[6:6+lendata])) | |
elif control_chr == 0x02: | |
print('product id: ', str(data[6:6+lendata])) | |
elif control_chr == 0x04: | |
print('radar test: ', str(data[6:6+lendata])) | |
elif control_chr == 0x05: | |
print('operating status: ', str(data[6:6+lendata])) | |
elif control_chr == 0x06: | |
print('radar localization information: ', str(data[6:6+lendata])) | |
elif control_chr == 0x80: | |
data_chr = int(str(data[6])) | |
if command_chr==0x01: | |
if self.presence==None: | |
self.presence = data_chr | |
if data_chr==1 and not self.presence: | |
print('Body Detected') | |
self.presence=True | |
elif data_chr==0 and self.presence: | |
print('No Body Detected') | |
self.presence=False | |
elif command_chr==0x02: | |
if data_chr==0x0: | |
print(' campaign information: None') | |
elif data_chr==0x01: | |
print(' campaign information: Close') | |
elif data_chr==0x02: | |
print(' campaign information: Far') | |
elif data_chr==0x03: | |
print(' campaign information: Chaotic') | |
elif command_chr==0x03: | |
print(' Body motion: ', data_chr) | |
elif command_chr==0x04: | |
print(' Number of people changed, now: ', data_chr) | |
elif command_chr==0x05: | |
# there's an additional message type with command_chr 0x05 | |
pass | |
else: | |
print(' 0x80 unknown command_chr: ', hex(control_chr), hex(command_chr), lendata, chksum, len(data) - 9) | |
elif control_chr == 0x81: | |
data_chr = int(str(data[6])) | |
if command_chr==0x01: | |
print(' HR: ', int(str(data[7])), ', qual (1==normal)=', int(str(data[6]))) | |
elif command_chr==0x02: | |
print(' RR: ', int(str(data[7])), ', qual (1==normal)=', int(str(data[6]))) | |
elif command_chr==0x03: | |
print(' Resp pos warning out of range: ', int(str(data[6])), ', dist/angle=', int(str(data[7]))*256+int(str(data[8])), int(str(data[9]))*256+int(str(data[10]))) | |
elif command_chr==0x04: | |
if data_chr==0x4: | |
print('detection in progress...') | |
else: | |
print('resp 0x04 bye information: ', data_chr) | |
elif command_chr==0x07: | |
print('location detection anomaly: ', data_chr) | |
elif command_chr==0x08: | |
print(' resting distance: ', data_chr) | |
elif command_chr==0x09: | |
print(' resting angle: ', data_chr) | |
print('RR/HR detection, len: ', lendata) | |
elif control_chr == 0x85: | |
# unknown control_chr 0x85 | |
pass | |
else: | |
print('unknown control_chr: ', hex(control_chr), hex(command_chr), lendata, chksum, len(data) - 9) | |
# verify checksum | |
#print(np.sum([int(str(d)) for d in data])) | |
#print(chksum, np.sum([int(str(d)) for d in data[6:lendata+6]])) | |
#print(hex(control_chr), hex(command_chr), lendata, chksum, len(data) - 9) | |
def __init__(self): | |
self.presence=None | |
baud=115200 | |
port='/dev/ttyAMA1' | |
par='N' | |
stop=1 | |
timeout=1.0 | |
bit=8 | |
self.mmw_comms = serial.Serial(port=port, baudrate=baud, bytesize=bit, parity=par, stopbits=stop, timeout=timeout) | |
self.mmw_comms.flush() | |
self.mmw_comms.read_all() | |
self.timer_10hz = rospy.Timer(rospy.Duration(0.1), self.timer_fast_callback) | |
print('Node up') | |
sys.stdout.flush() | |
def main(args): | |
rospy.init_node('mmw_node', anonymous=False) | |
node = monitor_node() | |
try: | |
rospy.spin() | |
except KeyboardInterrupt: | |
print("Shutting down mmw node") | |
node.exit_handler() | |
time.sleep(1) | |
if __name__ == '__main__': | |
main(sys.argv) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment