Skip to content

Instantly share code, notes, and snippets.

@allrobot
Created April 16, 2022 02:08
Show Gist options
  • Save allrobot/0fcd8c06f1351669445f5263348fd5ad to your computer and use it in GitHub Desktop.
Save allrobot/0fcd8c06f1351669445f5263348fd5ad to your computer and use it in GitHub Desktop.
import ctypes
import asyncio
import logging
import EMG_NUM
import numpy as np
from bleak import BleakClient
logger = logging.getLogger()
# 运行BLE客户端
def callback_handler(sender,data):
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
# 如果你需要一个获取数据的超时时间,在asyncio.wait_for(queue.get(), timeout=1.0)设置timeout值
# 如果未收到信息,输出
if data is None:
logger.info(
"退出BLE客户端...."
)
else:
try:
# temp_output=np.array(str(data, "utf-8").split(':'))
# temp = np.array(temp_output[0].split(','))
# if(np.array_equal(EMG_NUM.EMG,temp_output)):
# EMG_NUM.sudden=True
# EMG_NUM.Connect=False
# else:
# EMG_NUM.EMG=temp_output
# EMG_NUM.EMG = temp
print(data)
print(type(data))
print(str(data, "utf-8").split(','))
temp_output=str(data, "utf-8").split(',')
print("temp_out:"+temp_output)
if EMG_NUM.temp_threshold:
for i in range(6):
EMG_NUM.EMG_min_max[i][0],EMG_NUM.EMG_min_max[i][1]=temp_output[i]-temp_output[i]/2,temp_output[i]+temp_output[i]/2
EMG_NUM.temp_threshold=False
print(temp_output)
EMG_NUM.EMG = np.array(temp_output)
# print(temp_output)
if EMG_NUM.Write:
f = open('write_sEMG.txt', 'a+')
f.write(str(temp_output) + '\n')
f.close()
except:
pass
# BLE通常的连接间隔约为30到50毫秒,因此每秒只能发送20到30个数据包。
# HID设备可以有15毫秒的最快间隔(取决于配置),即每秒66个数据包。
# sEMG每秒发送36000字节,需调整
# 大多数设备的最大MTU为512字节,因此这将产生66 * 512 = 大约34000字节每秒
# 因此,ESP32上的缓冲区可能因试图以比可能更快的速度发送数据而溢出。
# 创建一个客户端事件,并且定时决定什么时候中断接收
# await client.stop_notify(char_uuid)
# Send an "exit command to the consumer"退出命令到外部设备
# 打印外部设备传来的信息队列
# 按钮事件发生时记录EMG数据
async def main(address: str, char_uuid: str):
async with BleakClient(address) as client:
EMG_NUM.Connect=client.is_connected
EMG_NUM.sudden=False
# 开启客户端通知
await client.start_notify(char_uuid, callback_handler)
# 主程序未结束前一直循环,防止中断连接
while EMG_NUM.Connect:
# 抛出中断异常并结束循环
if client.is_connected:
EMG_NUM.sudden=False
else:
EMG_NUM.sudden=True
EMG_NUM.Connect=False
break
# 持续循环输出
if EMG_NUM.Kill:
# 表示主程序终止,立刻终止本次循环
break
else:
await asyncio.sleep(1.0)
def ble():
# 主程序未退出时,循环寻找设备中
# 同时判断未连接状态和主程序kill为true时跳出
while EMG_NUM.Kill == False:
try:
logging.basicConfig(level=logging.INFO)
asyncio.run(
main(
"7C:9E:BD:48:6A:36",
"ca73b3ba-39f6-4ab3-91ae-186dc9577d99",
)
)
except:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment