Skip to content

Instantly share code, notes, and snippets.

@akirayou
Created May 15, 2021 16:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akirayou/6aaefb718a54a8592b62ae825a95846f to your computer and use it in GitHub Desktop.
Save akirayou/6aaefb718a54a8592b62ae825a95846f to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
"""
Example implimentation for YDLIDAR X4
"""
#pip install pyserial-asyncio
import serial_asyncio
import asyncio
import time
#spyderとかipython環境で使うときはシステムですでにasyncio使ってて喧嘩するので
#nest_asyncioを使う
#to run on spyder console and so on you need this
import nest_asyncio
nest_asyncio.apply()
import struct
import numpy as np
###
#この関数1個がpyserial-asyncからYD LIDAR X4を使うためのコード
#何も考えずにコピーすればOK
###
def get_YD_X4(call_back=None):
class YD_X4_Data():
def __init__(self,call_back):
self.last_data={"ang":np.array([] ),"dist":np.array([] ) }
self.last_angle=0
self._store_angles=np.array([],dtype=np.float64)
self._store_dists=np.array([],dtype=np.float64)
self.call_back=call_back
def corrected_data(self):
angles=np.copy(self.last_data["ang"])
dists=self.last_data["dist"]
vidx=np.nonzero(0.1<dists)[0]
#https://bitbucket.org/akira_you/esp-ydlidarx4/src/master/main/ydlidarTask.cpp
angles[vidx]+= 1/( 0.0008162604522317974 * dists[vidx] -0.00220016337230633 )- 7.990596267964586;
#angles[vidx]+= 180/np.pi*np.arctan(21.8*(155.3-dists[vidx])/(155.3*dists[vidx]) )
return {"ang":angles,"dist":dists}
def add_packet(self,ct,fsa,lsa,samples):
if(ct!=0):return
dists=np.array(samples)/4
r_lsa=lsa
if(lsa<fsa):r_lsa+=360
angles=np.linspace(fsa,r_lsa,len(dists))
angles[360<angles]-=360
if(lsa<self.last_angle):
zero_corss=np.nonzero(angles<self.last_angle)[0][0]
self.last_data={
"ang":np.r_[self._store_angles,angles[:zero_corss]],
"dist":np.r_[self._store_dists,dists[:zero_corss]],
}
if(self.call_back is not None):call_back(self.last_data)
self._store_angles=angles[zero_corss:]
self._store_dists=dists[zero_corss:]
else:
self._store_angles=np.r_[self._store_angles,angles]
self._store_dists=np.r_[self._store_dists,dists]
self.last_angle=lsa
#print("get packet ",ct,fsa,lsa)
_the_data=YD_X4_Data(call_back)
class YD_X4_packet(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.dtr = True #motor start
transport.serial.write(b"\xA5\x60")
time.sleep(0.1)
transport.serial.write(b"\xA5\x60")
print("connected")
self.buffer=b'';
self.serial=transport.serial
def data_received(self,recv_data):
self.buffer+=recv_data
#In this loop break = "need more data" continue ="need to re-parse"
while (12<len(self.buffer)):
#seek candidate of header dlimitor
if(self.buffer[:2] != b"\xaa\x55"):
p=self.buffer.find(b"\xaa\x55")
if(p<0):
self.buffer=self.buffer[-1:]
break
self.buffer=self.buffer[p:]
print("RESEEK",len(self.buffer))
continue
#shorter than minimum length of the packet
if(len(self.buffer)<12):break
(ct,lsn)=struct.unpack('<2B',self.buffer[2:4])
packet_len=lsn*2+10
#print("packet_length",packet_len,"ct",ct)
if( len(self.buffer) <packet_len ): break
datas=struct.unpack('<{}H'.format(packet_len//2),
self.buffer[:packet_len])
if(datas[2]&1 ==0 or datas[3]&0x80==1):
self.buffer=self.buffer[2:] #FLA LSA check bit error reseek
continue
check=0
for d in datas: check^=d
if(check!=0):
self.buffer=self.buffer[2:] #checksum error reseek
continue
self.buffer=self.buffer[packet_len:]
#Do packet process
fsa=(datas[2]>>1)/64
lsa=(datas[3]>>1)/64
_the_data.add_packet(ct,fsa,lsa,datas[5:])
#print('data received', repr(recv_data))
def connection_lost(self, exc):
print('port closed')
self.transport.serial.close()
return _the_data,YD_X4_packet
##########################
#ここからがサンプルコード
########################
PORT="com3"
USE_EVENT=True
count=0
if(USE_EVENT):
event=asyncio.Event()
#一周分のデータが集まる度に呼ばれるコールバック(なくてもOK)
def data_count(data):
global count,event
count+=1
if USE_EVENT:
event.set()#データを即時取るためにはeventでasync_mainに教えてあげる
import cv2
import math
#可視化コードのメイン部分
async def async_main():
global yd_data,count,event
#可視化解像度と最大描画距離の設定
width=1024
height=1024
max_d=3*1000
while(True):
img=np.zeros((height,width),dtype=np.uint8)
if(USE_EVENT):
await asyncio.sleep(0.01)#event.waitだけだと,mainタスクが処理を占有してしまうリスクがあるので、適宜sleepをかける
await event.wait() #データがくるまで待つ
event.clear()
else:
await asyncio.sleep(0.1)#一定時間ごとに最新のデータを処理するには適当なところでsleepを入れておく
#角度補正済みのデータを取得(角度が負の値をとったり、距離に応じて前後したりする)
#補正が不要なときは yd_data.last_dataを使う
data=yd_data.corrected_data()
#可視化のロジック
#distが0なのは計測出来なかった点、装置的な有効距離は(0.12~10m程度)
for a,d in zip(np.pi*data["ang"]/180,data["dist"]):
x=int( width*(0.5 +0.5*d/max_d*math.cos(a)))
y=int( height*(0.5 +0.5*d/max_d*math.sin(a)))
if(0<=x<width and 0<=y<height):
img[y,x]=255
cv2.imshow("scan",img)
if(27==cv2.waitKey(1)):break #ESCキーで終了
print("count",count)
cv2.destroyAllWindows()
#asyncioを使って非同期に実装する(IO待ちしたくないので)
loop = asyncio.get_event_loop()
#yd_dataがmainから触るデータが入る所
#protoがpySerial-asyncioのためのもの
#ここでcall_backを指定しているのでcall_backでゴリゴリ処理してもOK(処理が重いとバッファにデータが貯まるので注意)
yd_data,proto=get_YD_X4(call_back=data_count)
serial_task = serial_asyncio.create_serial_connection(loop, proto,
PORT, baudrate=128000)
transport,portocol=loop.run_until_complete(serial_task)
loop.run_until_complete(async_main())
transport.serial.close()
#loop.stop() #spyderではloop.stopしないほうがデバグがら楽
#loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment