Created
May 15, 2021 16:02
-
-
Save akirayou/6aaefb718a54a8592b62ae825a95846f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
""" | |
Example implimentation for YDLIDAR X4 | |
""" | |
#pip install pyserial-asyncio | |
import serial_asyncio | |
import asyncio | |
import time | |
#spyderとかipython環境で使うときはシステムですでにasyncio使ってて喧嘩するので | |
#nest_asyncioを使う | |
#to run on spyder console and so on you need this | |
import nest_asyncio | |
nest_asyncio.apply() | |
import struct | |
import numpy as np | |
### | |
#この関数1個がpyserial-asyncからYD LIDAR X4を使うためのコード | |
#何も考えずにコピーすればOK | |
### | |
def get_YD_X4(call_back=None): | |
class YD_X4_Data(): | |
def __init__(self,call_back): | |
self.last_data={"ang":np.array([] ),"dist":np.array([] ) } | |
self.last_angle=0 | |
self._store_angles=np.array([],dtype=np.float64) | |
self._store_dists=np.array([],dtype=np.float64) | |
self.call_back=call_back | |
def corrected_data(self): | |
angles=np.copy(self.last_data["ang"]) | |
dists=self.last_data["dist"] | |
vidx=np.nonzero(0.1<dists)[0] | |
#https://bitbucket.org/akira_you/esp-ydlidarx4/src/master/main/ydlidarTask.cpp | |
angles[vidx]+= 1/( 0.0008162604522317974 * dists[vidx] -0.00220016337230633 )- 7.990596267964586; | |
#angles[vidx]+= 180/np.pi*np.arctan(21.8*(155.3-dists[vidx])/(155.3*dists[vidx]) ) | |
return {"ang":angles,"dist":dists} | |
def add_packet(self,ct,fsa,lsa,samples): | |
if(ct!=0):return | |
dists=np.array(samples)/4 | |
r_lsa=lsa | |
if(lsa<fsa):r_lsa+=360 | |
angles=np.linspace(fsa,r_lsa,len(dists)) | |
angles[360<angles]-=360 | |
if(lsa<self.last_angle): | |
zero_corss=np.nonzero(angles<self.last_angle)[0][0] | |
self.last_data={ | |
"ang":np.r_[self._store_angles,angles[:zero_corss]], | |
"dist":np.r_[self._store_dists,dists[:zero_corss]], | |
} | |
if(self.call_back is not None):call_back(self.last_data) | |
self._store_angles=angles[zero_corss:] | |
self._store_dists=dists[zero_corss:] | |
else: | |
self._store_angles=np.r_[self._store_angles,angles] | |
self._store_dists=np.r_[self._store_dists,dists] | |
self.last_angle=lsa | |
#print("get packet ",ct,fsa,lsa) | |
_the_data=YD_X4_Data(call_back) | |
class YD_X4_packet(asyncio.Protocol): | |
def connection_made(self, transport): | |
self.transport = transport | |
print('port opened', transport) | |
transport.serial.dtr = True #motor start | |
transport.serial.write(b"\xA5\x60") | |
time.sleep(0.1) | |
transport.serial.write(b"\xA5\x60") | |
print("connected") | |
self.buffer=b''; | |
self.serial=transport.serial | |
def data_received(self,recv_data): | |
self.buffer+=recv_data | |
#In this loop break = "need more data" continue ="need to re-parse" | |
while (12<len(self.buffer)): | |
#seek candidate of header dlimitor | |
if(self.buffer[:2] != b"\xaa\x55"): | |
p=self.buffer.find(b"\xaa\x55") | |
if(p<0): | |
self.buffer=self.buffer[-1:] | |
break | |
self.buffer=self.buffer[p:] | |
print("RESEEK",len(self.buffer)) | |
continue | |
#shorter than minimum length of the packet | |
if(len(self.buffer)<12):break | |
(ct,lsn)=struct.unpack('<2B',self.buffer[2:4]) | |
packet_len=lsn*2+10 | |
#print("packet_length",packet_len,"ct",ct) | |
if( len(self.buffer) <packet_len ): break | |
datas=struct.unpack('<{}H'.format(packet_len//2), | |
self.buffer[:packet_len]) | |
if(datas[2]&1 ==0 or datas[3]&0x80==1): | |
self.buffer=self.buffer[2:] #FLA LSA check bit error reseek | |
continue | |
check=0 | |
for d in datas: check^=d | |
if(check!=0): | |
self.buffer=self.buffer[2:] #checksum error reseek | |
continue | |
self.buffer=self.buffer[packet_len:] | |
#Do packet process | |
fsa=(datas[2]>>1)/64 | |
lsa=(datas[3]>>1)/64 | |
_the_data.add_packet(ct,fsa,lsa,datas[5:]) | |
#print('data received', repr(recv_data)) | |
def connection_lost(self, exc): | |
print('port closed') | |
self.transport.serial.close() | |
return _the_data,YD_X4_packet | |
########################## | |
#ここからがサンプルコード | |
######################## | |
PORT="com3" | |
USE_EVENT=True | |
count=0 | |
if(USE_EVENT): | |
event=asyncio.Event() | |
#一周分のデータが集まる度に呼ばれるコールバック(なくてもOK) | |
def data_count(data): | |
global count,event | |
count+=1 | |
if USE_EVENT: | |
event.set()#データを即時取るためにはeventでasync_mainに教えてあげる | |
import cv2 | |
import math | |
#可視化コードのメイン部分 | |
async def async_main(): | |
global yd_data,count,event | |
#可視化解像度と最大描画距離の設定 | |
width=1024 | |
height=1024 | |
max_d=3*1000 | |
while(True): | |
img=np.zeros((height,width),dtype=np.uint8) | |
if(USE_EVENT): | |
await asyncio.sleep(0.01)#event.waitだけだと,mainタスクが処理を占有してしまうリスクがあるので、適宜sleepをかける | |
await event.wait() #データがくるまで待つ | |
event.clear() | |
else: | |
await asyncio.sleep(0.1)#一定時間ごとに最新のデータを処理するには適当なところでsleepを入れておく | |
#角度補正済みのデータを取得(角度が負の値をとったり、距離に応じて前後したりする) | |
#補正が不要なときは yd_data.last_dataを使う | |
data=yd_data.corrected_data() | |
#可視化のロジック | |
#distが0なのは計測出来なかった点、装置的な有効距離は(0.12~10m程度) | |
for a,d in zip(np.pi*data["ang"]/180,data["dist"]): | |
x=int( width*(0.5 +0.5*d/max_d*math.cos(a))) | |
y=int( height*(0.5 +0.5*d/max_d*math.sin(a))) | |
if(0<=x<width and 0<=y<height): | |
img[y,x]=255 | |
cv2.imshow("scan",img) | |
if(27==cv2.waitKey(1)):break #ESCキーで終了 | |
print("count",count) | |
cv2.destroyAllWindows() | |
#asyncioを使って非同期に実装する(IO待ちしたくないので) | |
loop = asyncio.get_event_loop() | |
#yd_dataがmainから触るデータが入る所 | |
#protoがpySerial-asyncioのためのもの | |
#ここでcall_backを指定しているのでcall_backでゴリゴリ処理してもOK(処理が重いとバッファにデータが貯まるので注意) | |
yd_data,proto=get_YD_X4(call_back=data_count) | |
serial_task = serial_asyncio.create_serial_connection(loop, proto, | |
PORT, baudrate=128000) | |
transport,portocol=loop.run_until_complete(serial_task) | |
loop.run_until_complete(async_main()) | |
transport.serial.close() | |
#loop.stop() #spyderではloop.stopしないほうがデバグがら楽 | |
#loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment