Test program to poll Neato laser distance sensor data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial | |
import time | |
lf = b'\n' | |
eom = b'\x1a' | |
class NeatoCommander(): | |
def __init__(self, port="/dev/ttyACM1", timeout=1): | |
s = serial.Serial() | |
s.baudrate = 1152000 | |
s.port = port | |
s.timeout = timeout | |
s.open() | |
if s.is_open: | |
print("Successfully opened port") | |
self.sp = s | |
self.all_responses = dict() | |
def now(self): | |
return time.perf_counter() | |
def send_neato_command(self, command): | |
self.sp.write(command + lf) | |
echo = self.sp.readline() | |
if echo[:len(command)] != command: | |
raise ValueError("Neato did not echo back command as expected: {}".format(command)) | |
first_char = self.sp.read(1) | |
if first_char != eom: | |
raw_response = [] | |
while first_char != eom: | |
line_remainder = self.sp.readline() | |
line = first_char + line_remainder | |
raw_response.append(line) | |
first_char = self.sp.read(1) | |
self.all_responses[self.now()] = raw_response | |
return len(raw_response) | |
else: | |
return 0 | |
def exit(self): | |
print("Closing port") | |
self.sp.close() | |
if __name__ == "__main__": | |
nc = NeatoCommander() | |
try: | |
nc.send_neato_command(b"testmode on") | |
nc.send_neato_command(b"setldsrotation on") | |
marker = nc.now() | |
while nc.now() < marker+10: | |
nc.send_neato_command(b"getldsscan") | |
finally: | |
nc.send_neato_command(b"setldsrotation off") | |
nc.send_neato_command(b"testmode off") | |
nc.exit() | |
sorted_time = sorted(nc.all_responses.keys()) | |
prev_response = frozenset(nc.all_responses[sorted_time[0]]) | |
unique = 1 | |
for timestamp in sorted_time: | |
now_response = frozenset(nc.all_responses[timestamp]) | |
if len(now_response-prev_response)>0: | |
print("Unique data at time {}".format(timestamp)) | |
unique += 1 | |
else: | |
print("Repeated data at time{}".format(timestamp)) | |
prev_response = now_response | |
print("Found {} updates in {} elements over {}".format(unique,len(nc.all_responses), sorted_time[-1]-sorted_time[0])) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample output from my Neato XV-21, informing us we can poll LDS data a little over 10 times a second over USB serial port in test mode.
Judging by the fact almost half of the data were repeats, that's roughly double the speed we require to keep up to date and see all LDS data.
(The first few readings were repeated due to laser distance scanner spinning up to speed.)