Skip to content

Instantly share code, notes, and snippets.

@akirayou
Created May 16, 2021 08:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akirayou/9e7c4e919edfe67dea43310d2b068c86 to your computer and use it in GitHub Desktop.
Save akirayou/9e7c4e919edfe67dea43310d2b068c86 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Example implimentation for YDLIDAR X4
"""
#pip install pyserial-asyncio
import serial_asyncio
import asyncio
import time
#spyderとかipython環境で使うときはシステムですでにasyncio使ってて喧嘩するので
#nest_asyncioを使う
#to run on spyder console and so on you need this
import nest_asyncio
nest_asyncio.apply()
import struct
import numpy as np
###
#この関数1個がpyserial-asyncからYD LIDAR X4を使うためのコード
#何も考えずにコピーすればOK
###
def get_YD_X4(call_back=None):
class YD_X4_Data():
def __init__(self,call_back):
self.last_data={"ang":np.array([] ),"dist":np.array([] ) }
self.last_angle=0
self._store_angles=np.array([],dtype=np.float64)
self._store_dists=np.array([],dtype=np.float64)
self.call_back=call_back
def corrected_data(self):
angles=np.copy(self.last_data["ang"])
dists=self.last_data["dist"]
vidx=np.nonzero(0.1<dists)[0]
#https://bitbucket.org/akira_you/esp-ydlidarx4/src/master/main/ydlidarTask.cpp
angles[vidx]+= 1/( 0.0008162604522317974 * dists[vidx] -0.00220016337230633 )- 7.990596267964586;
#angles[vidx]+= 180/np.pi*np.arctan(21.8*(155.3-dists[vidx])/(155.3*dists[vidx]) )
return {"ang":angles,"dist":dists}
def add_packet(self,ct,fsa,lsa,samples):
if(ct!=0):return
dists=np.array(samples)/4
r_lsa=lsa
if(lsa<fsa):r_lsa+=360
angles=np.linspace(fsa,r_lsa,len(dists))
angles[360<angles]-=360
if(lsa<self.last_angle):
zero_corss=np.nonzero(angles<self.last_angle)[0][0]
self.last_data={
"ang":np.r_[self._store_angles,angles[:zero_corss]],
"dist":np.r_[self._store_dists,dists[:zero_corss]],
}
if(self.call_back is not None):call_back(self.last_data)
self._store_angles=angles[zero_corss:]
self._store_dists=dists[zero_corss:]
else:
self._store_angles=np.r_[self._store_angles,angles]
self._store_dists=np.r_[self._store_dists,dists]
self.last_angle=lsa
#print("get packet ",ct,fsa,lsa)
_the_data=YD_X4_Data(call_back)
class YD_X4_packet(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.dtr = True #motor start
transport.serial.write(b"\xA5\x60")
time.sleep(0.1)
transport.serial.write(b"\xA5\x60")
print("connected")
self.buffer=b'';
self.serial=transport.serial
def data_received(self,recv_data):
self.buffer+=recv_data
#In this loop break = "need more data" continue ="need to re-parse"
while (12<len(self.buffer)):
#seek candidate of header dlimitor
if(self.buffer[:2] != b"\xaa\x55"):
p=self.buffer.find(b"\xaa\x55")
if(p<0):
self.buffer=self.buffer[-1:]
break
self.buffer=self.buffer[p:]
print("RESEEK",len(self.buffer))
continue
#shorter than minimum length of the packet
if(len(self.buffer)<12):break
(ct,lsn)=struct.unpack('<2B',self.buffer[2:4])
packet_len=lsn*2+10
#print("packet_length",packet_len,"ct",ct)
if( len(self.buffer) <packet_len ): break
datas=struct.unpack('<{}H'.format(packet_len//2),
self.buffer[:packet_len])
if(datas[2]&1 ==0 or datas[3]&0x80==1):
self.buffer=self.buffer[2:] #FLA LSA check bit error reseek
continue
check=0
for d in datas: check^=d
if(check!=0):
self.buffer=self.buffer[2:] #checksum error reseek
continue
self.buffer=self.buffer[packet_len:]
#Do packet process
fsa=(datas[2]>>1)/64
lsa=(datas[3]>>1)/64
_the_data.add_packet(ct,fsa,lsa,datas[5:])
#print('data received', repr(recv_data))
def connection_lost(self, exc):
print('port closed')
self.transport.serial.close()
return _the_data,YD_X4_packet
##########################
#ここからがサンプルコード
########################
import cv2
out_fps=10
height=480
width=height*2
min_range=10
max_range=2*1000 #mm
thickness=400
erode_kanerl_size=21
max_img_thresh=0.9
PORT="com3"
cam_id=0
cam_h_direc=-1
fmt = cv2.VideoWriter_fourcc(*'mp4v') # ファイル形式(ここではmp4)
def fill_poly(img,col,angles,dists,rate=1,adder=0):
valid=np.nonzero(0<dists)[0]
va=np.pi*angles[valid]/180
vd=dists[valid]/max_range
vd[1<vd]=1
sidx=np.argsort(va)
va=va[sidx]
vd=vd[sidx]*rate+adder/max_range
x=(np.cos(va)*vd+1)*height
y=(np.sin(va)*vd)*height
#y=(np.sin(va)*vd)*height
points=np.array((x,y)).transpose().astype(np.int32)
#points=np.r_[points,np.array((width/2,0)).reshape(1,2).astype(np.int32) ]
cv2.fillPoly(img,[points],color=col)
#可視化コードのメイン部分
async def async_main():
global yd_datacount
update_max=True
span=1/out_fps
kernel = np.ones((erode_kanerl_size,erode_kanerl_size),np.uint8)
start=time.time()+span
max_img=np.zeros((height,width),dtype=np.float)
out_img=np.zeros((height*2,width*2,3),dtype=np.uint8)
cap_cam = cv2.VideoCapture(0)
cap_cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap_cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cv2.namedWindow("out")
await asyncio.sleep(0.5)
writer = cv2.VideoWriter('lidar_x4_fps{}start{}.mp4'.format(out_fps,start+span), fmt, out_fps, (width*2,height*2),isColor=True) # ライター作成
while(True):
m_img=np.zeros((height,width),dtype=np.uint8)
img=np.zeros((height,width),dtype=np.uint8)
img_t=np.zeros((height,width),dtype=np.uint8)
while(time.time() <start):
await asyncio.sleep(0.01)#一定時間ごとに最新のデータを処理するには適当なところでsleepを入れておく
start+=span
data=yd_data.corrected_data()
fill_poly(img,1,data["ang"],data["dist"])
fill_poly(img_t,1,data["ang"],data["dist"],adder=thickness)
img_t = cv2.erode(img_t,kernel,iterations = 1)
if(update_max):
fill_poly(m_img,1,data["ang"],data["dist"],rate=0.95,adder=-100)
max_img=max_img*0.9 + 0.1*m_img
#max_img/=np.max(max_img)
m_img*=0
m_img[max_img_thresh<max_img]+=1
out_img*=0
out_img[:height,width:,:]=img.reshape(height,width,1)*255
out_img[height:,:width,0]=max_img*255
out_img[height:,:width,2]=(max_img_thresh<max_img)*255
diff_img= img_t + m_img*2 + img*4
diff_img=(diff_img==3).astype(np.uint8)
out_img[height:,width:,0]=img_t*100+155*diff_img
out_img[height:,width:,1]=m_img*100+155*diff_img
out_img[height:,width:,2]=img*100
_, frame = cap_cam.read()
out_img[:frame.shape[0],:frame.shape[1],:]=frame[:,::cam_h_direc,:]
cv2.putText(out_img, 'Update_max: {}'.format(update_max), (width//2, height-20), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (200,200,200), thickness=2)
writer.write(out_img)
cv2.imshow("out",out_img)
key=cv2.waitKey(1)
if(key==27):break #ESCキーで終了
if(key==ord(' ')):
update_max=False if update_max else True
print("update ")
cv2.destroyAllWindows()
cap_cam.release()
writer.release()
#asyncioを使って非同期に実装する(IO待ちしたくないので)
loop = asyncio.get_event_loop()
#yd_dataがmainから触るデータが入る所
#protoがpySerial-asyncioのためのもの
#ここでcall_backを指定しているのでcall_backでゴリゴリ処理してもOK(処理が重いとバッファにデータが貯まるので注意)
yd_data,proto=get_YD_X4()
serial_task = serial_asyncio.create_serial_connection(loop, proto,
PORT, baudrate=128000)
transport,portocol=loop.run_until_complete(serial_task)
loop.run_until_complete(async_main())
transport.serial.close()
#loop.stop() #spyderではloop.stopしないほうがデバグがら楽
#loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment