Skip to content

Instantly share code, notes, and snippets.

@mattwilliamson
Created April 18, 2023 13:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattwilliamson/d1f288828afac3ff28d36b526c766ffc to your computer and use it in GitHub Desktop.
Save mattwilliamson/d1f288828afac3ff28d36b526c766ffc to your computer and use it in GitHub Desktop.
camsense x1 lidar parser
import serial
from struct import unpack
import matplotlib.pyplot as plt
import numpy as np
import queue
import threading
from matplotlib.animation import FuncAnimation
import time
import sys
from scipy.ndimage import median_filter, gaussian_filter1d
HEAD1 = 0x55
HEAD2 = 0xaa
HEAD3 = 0x03
HEAD4 = 0x08
ST_HEAD1 = 0
ST_HEAD2 = 1
ST_HEAD3 = 2
ST_HEAD4 = 3
ST_SPEED = 4
ST_START = 5
ST_DATA = 6
ST_END = 7
ST_CRC = 8
state = ST_HEAD1
distance = np.ones(360) * 0x8000
quality = np.zeros(360)
angle_index = np.array([])
last_start = 0
start = 0
buffer = b""
que = queue.Queue(20)
def parse_data(ser: serial.Serial):
global state
global distance
global quality
global last_start
global que
global buffer
global start_angle
if state == ST_HEAD1:
head = ser.read()
if int.from_bytes(head, "little") == HEAD1:
state = ST_HEAD2
elif state == ST_HEAD2:
head = ser.read()
if int.from_bytes(head, "little") == HEAD2:
state = ST_HEAD3
else:
state = ST_HEAD1
elif state == ST_HEAD3:
head = ser.read()
if int.from_bytes(head, "little") == HEAD3:
state = ST_HEAD4
else:
state = ST_HEAD1
elif state == ST_HEAD4:
head = ser.read()
if int.from_bytes(head, "little") == HEAD4:
state = ST_SPEED
else:
state = ST_HEAD1
elif state == ST_SPEED:
speed_bytes = ser.read(2)
# speed = int.from_bytes(speed_bytes, "little")
# print("speed: {}".format(speed))
state = ST_START
elif state == ST_START:
start_bytes = ser.read(2)
start = int.from_bytes(start_bytes, "little")
start_angle = start / 64 - 640
if last_start != 0:
if start < last_start:
## preprocess
# distance = median_filter(distance, 5)
## FIXME: kind of calibration for data rotation
shift = 10
distance = np.roll(distance, shift)
quality = np.roll(quality, shift)
##
temp = distance
print("invalid ratio: {}".format(temp[temp==0x8000].size/temp.size)) # show ratio of invalid data
que.put((distance, quality))
if 10 < que.qsize():
que.get(5) ## FIXME: drop frame for "real-time" display
que.put((distance, quality))
distance = np.ones(360) * 0x8000
quality = np.zeros(360)
state = ST_DATA
else: # ignore the data when start
ser.read(28)
state = ST_HEAD1
last_start = start
elif state == ST_DATA:
buffer = ser.read(24)
state = ST_END
elif state == ST_END:
end_bytes = ser.read(2)
end = int.from_bytes(end_bytes, "little")
end_angle = end / 64 - 640
if end_angle < start_angle:
end_angle = end_angle + 360
res = (end_angle - start_angle) / 8
if res == 0:
state = ST_HEAD1
else:
# print("start: {}, end: {}, res: {}".format(start_angle, end_angle, res))
angles = np.arange(start_angle, end_angle, res)
indices = (np.round(angles) % 360).astype(int)
# print("indices: {}".format(indices))
for i, j in zip(range(0, 24, 3), indices):
d, q = unpack("<HB", buffer[i:i+3])
if distance[j] != 0x8000:
distance[j] = (d + distance[j]) / 2
else:
distance[j] = d
if quality[j] != 0:
quality[j] = (q + quality[j]) / 2
else:
quality[j] = q
state = ST_CRC
elif state == ST_CRC: # FIXME: CRC16???
crc = int.from_bytes(ser.read(2), "little")
# print("crc: {}".format(hex(crc)))
state = ST_HEAD1
distance_line = None
quality_line = None
def update_plot(data):
global distance_line
global quality_line
distance_line.set_ydata(data[0][::-1])
quality_line.set_ydata(data[1][::-1])
return distance_line, quality_line
def gen_lidar_data():
global que
yield que.get()
def lidar_animation():
global distance_line
global quality_line
fig = plt.figure()
ax = fig.add_subplot(111, projection="polar")
radian = np.arange(0, np.pi*2, np.deg2rad(1))
ax.set_rlim(0, 8000)
distance_line = ax.plot(radian, np.zeros_like(radian), "r.")[0]
quality_line = ax.plot(radian, np.zeros_like(radian), "g.")[0]
ani = FuncAnimation(fig, update_plot, gen_lidar_data)
plt.show()
def main():
ser = serial.Serial("/dev/ttyUSB0", 115200, timeout=1)
ser.flush()
t = threading.Thread(target=lidar_animation)
t.start()
try:
while True:
parse_data(ser)
except KeyboardInterrupt:
t.join()
ser.close()
print("Exit camsense")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment