Created
May 16, 2021 08:13
-
-
Save akirayou/9e7c4e919edfe67dea43310d2b068c86 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Example implimentation for YDLIDAR X4 | |
""" | |
#pip install pyserial-asyncio | |
import serial_asyncio | |
import asyncio | |
import time | |
#spyderとかipython環境で使うときはシステムですでにasyncio使ってて喧嘩するので | |
#nest_asyncioを使う | |
#to run on spyder console and so on you need this | |
import nest_asyncio | |
nest_asyncio.apply() | |
import struct | |
import numpy as np | |
### | |
#この関数1個がpyserial-asyncからYD LIDAR X4を使うためのコード | |
#何も考えずにコピーすればOK | |
### | |
def get_YD_X4(call_back=None): | |
class YD_X4_Data(): | |
def __init__(self,call_back): | |
self.last_data={"ang":np.array([] ),"dist":np.array([] ) } | |
self.last_angle=0 | |
self._store_angles=np.array([],dtype=np.float64) | |
self._store_dists=np.array([],dtype=np.float64) | |
self.call_back=call_back | |
def corrected_data(self): | |
angles=np.copy(self.last_data["ang"]) | |
dists=self.last_data["dist"] | |
vidx=np.nonzero(0.1<dists)[0] | |
#https://bitbucket.org/akira_you/esp-ydlidarx4/src/master/main/ydlidarTask.cpp | |
angles[vidx]+= 1/( 0.0008162604522317974 * dists[vidx] -0.00220016337230633 )- 7.990596267964586; | |
#angles[vidx]+= 180/np.pi*np.arctan(21.8*(155.3-dists[vidx])/(155.3*dists[vidx]) ) | |
return {"ang":angles,"dist":dists} | |
def add_packet(self,ct,fsa,lsa,samples): | |
if(ct!=0):return | |
dists=np.array(samples)/4 | |
r_lsa=lsa | |
if(lsa<fsa):r_lsa+=360 | |
angles=np.linspace(fsa,r_lsa,len(dists)) | |
angles[360<angles]-=360 | |
if(lsa<self.last_angle): | |
zero_corss=np.nonzero(angles<self.last_angle)[0][0] | |
self.last_data={ | |
"ang":np.r_[self._store_angles,angles[:zero_corss]], | |
"dist":np.r_[self._store_dists,dists[:zero_corss]], | |
} | |
if(self.call_back is not None):call_back(self.last_data) | |
self._store_angles=angles[zero_corss:] | |
self._store_dists=dists[zero_corss:] | |
else: | |
self._store_angles=np.r_[self._store_angles,angles] | |
self._store_dists=np.r_[self._store_dists,dists] | |
self.last_angle=lsa | |
#print("get packet ",ct,fsa,lsa) | |
_the_data=YD_X4_Data(call_back) | |
class YD_X4_packet(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.transport = transport | |
print('port opened', transport) | |
transport.serial.dtr = True #motor start | |
transport.serial.write(b"\xA5\x60") | |
time.sleep(0.1) | |
transport.serial.write(b"\xA5\x60") | |
print("connected") | |
self.buffer=b''; | |
self.serial=transport.serial | |
def data_received(self,recv_data): | |
self.buffer+=recv_data | |
#In this loop break = "need more data" continue ="need to re-parse" | |
while (12<len(self.buffer)): | |
#seek candidate of header dlimitor | |
if(self.buffer[:2] != b"\xaa\x55"): | |
p=self.buffer.find(b"\xaa\x55") | |
if(p<0): | |
self.buffer=self.buffer[-1:] | |
break | |
self.buffer=self.buffer[p:] | |
print("RESEEK",len(self.buffer)) | |
continue | |
#shorter than minimum length of the packet | |
if(len(self.buffer)<12):break | |
(ct,lsn)=struct.unpack('<2B',self.buffer[2:4]) | |
packet_len=lsn*2+10 | |
#print("packet_length",packet_len,"ct",ct) | |
if( len(self.buffer) <packet_len ): break | |
datas=struct.unpack('<{}H'.format(packet_len//2), | |
self.buffer[:packet_len]) | |
if(datas[2]&1 ==0 or datas[3]&0x80==1): | |
self.buffer=self.buffer[2:] #FLA LSA check bit error reseek | |
continue | |
check=0 | |
for d in datas: check^=d | |
if(check!=0): | |
self.buffer=self.buffer[2:] #checksum error reseek | |
continue | |
self.buffer=self.buffer[packet_len:] | |
#Do packet process | |
fsa=(datas[2]>>1)/64 | |
lsa=(datas[3]>>1)/64 | |
_the_data.add_packet(ct,fsa,lsa,datas[5:]) | |
#print('data received', repr(recv_data)) | |
def connection_lost(self, exc): | |
print('port closed') | |
self.transport.serial.close() | |
return _the_data,YD_X4_packet | |
########################## | |
#ここからがサンプルコード | |
######################## | |
import cv2 | |
out_fps=10 | |
height=480 | |
width=height*2 | |
min_range=10 | |
max_range=2*1000 #mm | |
thickness=400 | |
erode_kanerl_size=21 | |
max_img_thresh=0.9 | |
PORT="com3" | |
cam_id=0 | |
cam_h_direc=-1 | |
fmt = cv2.VideoWriter_fourcc(*'mp4v') # ファイル形式(ここではmp4) | |
def fill_poly(img,col,angles,dists,rate=1,adder=0): | |
valid=np.nonzero(0<dists)[0] | |
va=np.pi*angles[valid]/180 | |
vd=dists[valid]/max_range | |
vd[1<vd]=1 | |
sidx=np.argsort(va) | |
va=va[sidx] | |
vd=vd[sidx]*rate+adder/max_range | |
x=(np.cos(va)*vd+1)*height | |
y=(np.sin(va)*vd)*height | |
#y=(np.sin(va)*vd)*height | |
points=np.array((x,y)).transpose().astype(np.int32) | |
#points=np.r_[points,np.array((width/2,0)).reshape(1,2).astype(np.int32) ] | |
cv2.fillPoly(img,[points],color=col) | |
#可視化コードのメイン部分 | |
async def async_main(): | |
global yd_datacount | |
update_max=True | |
span=1/out_fps | |
kernel = np.ones((erode_kanerl_size,erode_kanerl_size),np.uint8) | |
start=time.time()+span | |
max_img=np.zeros((height,width),dtype=np.float) | |
out_img=np.zeros((height*2,width*2,3),dtype=np.uint8) | |
cap_cam = cv2.VideoCapture(0) | |
cap_cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640) | |
cap_cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) | |
cv2.namedWindow("out") | |
await asyncio.sleep(0.5) | |
writer = cv2.VideoWriter('lidar_x4_fps{}start{}.mp4'.format(out_fps,start+span), fmt, out_fps, (width*2,height*2),isColor=True) # ライター作成 | |
while(True): | |
m_img=np.zeros((height,width),dtype=np.uint8) | |
img=np.zeros((height,width),dtype=np.uint8) | |
img_t=np.zeros((height,width),dtype=np.uint8) | |
while(time.time() <start): | |
await asyncio.sleep(0.01)#一定時間ごとに最新のデータを処理するには適当なところでsleepを入れておく | |
start+=span | |
data=yd_data.corrected_data() | |
fill_poly(img,1,data["ang"],data["dist"]) | |
fill_poly(img_t,1,data["ang"],data["dist"],adder=thickness) | |
img_t = cv2.erode(img_t,kernel,iterations = 1) | |
if(update_max): | |
fill_poly(m_img,1,data["ang"],data["dist"],rate=0.95,adder=-100) | |
max_img=max_img*0.9 + 0.1*m_img | |
#max_img/=np.max(max_img) | |
m_img*=0 | |
m_img[max_img_thresh<max_img]+=1 | |
out_img*=0 | |
out_img[:height,width:,:]=img.reshape(height,width,1)*255 | |
out_img[height:,:width,0]=max_img*255 | |
out_img[height:,:width,2]=(max_img_thresh<max_img)*255 | |
diff_img= img_t + m_img*2 + img*4 | |
diff_img=(diff_img==3).astype(np.uint8) | |
out_img[height:,width:,0]=img_t*100+155*diff_img | |
out_img[height:,width:,1]=m_img*100+155*diff_img | |
out_img[height:,width:,2]=img*100 | |
_, frame = cap_cam.read() | |
out_img[:frame.shape[0],:frame.shape[1],:]=frame[:,::cam_h_direc,:] | |
cv2.putText(out_img, 'Update_max: {}'.format(update_max), (width//2, height-20), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (200,200,200), thickness=2) | |
writer.write(out_img) | |
cv2.imshow("out",out_img) | |
key=cv2.waitKey(1) | |
if(key==27):break #ESCキーで終了 | |
if(key==ord(' ')): | |
update_max=False if update_max else True | |
print("update ") | |
cv2.destroyAllWindows() | |
cap_cam.release() | |
writer.release() | |
#asyncioを使って非同期に実装する(IO待ちしたくないので) | |
loop = asyncio.get_event_loop() | |
#yd_dataがmainから触るデータが入る所 | |
#protoがpySerial-asyncioのためのもの | |
#ここでcall_backを指定しているのでcall_backでゴリゴリ処理してもOK(処理が重いとバッファにデータが貯まるので注意) | |
yd_data,proto=get_YD_X4() | |
serial_task = serial_asyncio.create_serial_connection(loop, proto, | |
PORT, baudrate=128000) | |
transport,portocol=loop.run_until_complete(serial_task) | |
loop.run_until_complete(async_main()) | |
transport.serial.close() | |
#loop.stop() #spyderではloop.stopしないほうがデバグがら楽 | |
#loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment