Skip to content

Instantly share code, notes, and snippets.

@allrobot
Created March 31, 2022 04:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allrobot/1547447f313942f278118cb2e569f59f to your computer and use it in GitHub Desktop.
Save allrobot/1547447f313942f278118cb2e569f59f to your computer and use it in GitHub Desktop.
import numpy as np
import sys
import time
import asyncio
import logging
from bleak import BleakClient
EMG = np.zeros(6)
logger = logging.getLogger()
# 运行BLE客户端
async def run_ble_client(address: str, char_uuid: str, queue: asyncio.Queue):
# 将一个目标数据放入队列中。如果队列已满,则在添加项之前等待,直到有空队列可用
async def callback_handler(sender, data):
await queue.put((time.time(), data))
# BLE通常的连接间隔约为30到50毫秒,因此每秒只能发送20到30个数据包。
# HID设备可以有15毫秒的最快间隔(取决于配置),即每秒66个数据包。
# sEMG每秒发送36000字节,需调整
# 大多数设备的最大MTU为512字节,因此这将产生66 * 512 = 大约34000字节每秒
# 因此,ESP32上的缓冲区可能因试图以比可能更快的速度发送数据而溢出。
# 创建一个客户端事件,并且定时决定什么时候中断接收
async with BleakClient(address) as client:
# 打印BLE连接状态
logger.info(f"Connected: {client.is_connected}")
# 开启客户端通知
await client.start_notify(char_uuid, callback_handler)
while True:
await asyncio.sleep(3600.0)
# await client.stop_notify(char_uuid)
# Send an "exit command to the consumer"退出命令到外部设备
await queue.put((time.time(), None))
# 打印外部设备传来的信息队列
async def run_queue_consumer(queue: asyncio.Queue):
while True:
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
# 如果你需要一个获取数据的超时时间,在asyncio.wait_for(queue.get(), timeout=1.0)设置timeout值
epoch, data = await queue.get()
# 如果未收到信息,输出
if data is None:
logger.info(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
else:
EMG = str(data, "utf-8").split(',')
print(EMG)
print(queue.full())
async def main(address: str, char_uuid: str):
# 设置队列,用于协调生产者和消费者协程。
queue = asyncio.Queue()
# 创建客户端任务
client_task = run_ble_client(address, char_uuid, queue)
# 运行打印循环
consumer_task = run_queue_consumer(queue)
#
await asyncio.gather(client_task, consumer_task)
logger.info("Main method done.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment