Created
April 16, 2022 02:08
-
-
Save allrobot/0fcd8c06f1351669445f5263348fd5ad to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ctypes | |
import asyncio | |
import logging | |
import EMG_NUM | |
import numpy as np | |
from bleak import BleakClient | |
logger = logging.getLogger() | |
# 运行BLE客户端 | |
def callback_handler(sender,data): | |
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data. | |
# 如果你需要一个获取数据的超时时间,在asyncio.wait_for(queue.get(), timeout=1.0)设置timeout值 | |
# 如果未收到信息,输出 | |
if data is None: | |
logger.info( | |
"退出BLE客户端...." | |
) | |
else: | |
try: | |
# temp_output=np.array(str(data, "utf-8").split(':')) | |
# temp = np.array(temp_output[0].split(',')) | |
# if(np.array_equal(EMG_NUM.EMG,temp_output)): | |
# EMG_NUM.sudden=True | |
# EMG_NUM.Connect=False | |
# else: | |
# EMG_NUM.EMG=temp_output | |
# EMG_NUM.EMG = temp | |
print(data) | |
print(type(data)) | |
print(str(data, "utf-8").split(',')) | |
temp_output=str(data, "utf-8").split(',') | |
print("temp_out:"+temp_output) | |
if EMG_NUM.temp_threshold: | |
for i in range(6): | |
EMG_NUM.EMG_min_max[i][0],EMG_NUM.EMG_min_max[i][1]=temp_output[i]-temp_output[i]/2,temp_output[i]+temp_output[i]/2 | |
EMG_NUM.temp_threshold=False | |
print(temp_output) | |
EMG_NUM.EMG = np.array(temp_output) | |
# print(temp_output) | |
if EMG_NUM.Write: | |
f = open('write_sEMG.txt', 'a+') | |
f.write(str(temp_output) + '\n') | |
f.close() | |
except: | |
pass | |
# BLE通常的连接间隔约为30到50毫秒,因此每秒只能发送20到30个数据包。 | |
# HID设备可以有15毫秒的最快间隔(取决于配置),即每秒66个数据包。 | |
# sEMG每秒发送36000字节,需调整 | |
# 大多数设备的最大MTU为512字节,因此这将产生66 * 512 = 大约34000字节每秒 | |
# 因此,ESP32上的缓冲区可能因试图以比可能更快的速度发送数据而溢出。 | |
# 创建一个客户端事件,并且定时决定什么时候中断接收 | |
# await client.stop_notify(char_uuid) | |
# Send an "exit command to the consumer"退出命令到外部设备 | |
# 打印外部设备传来的信息队列 | |
# 按钮事件发生时记录EMG数据 | |
async def main(address: str, char_uuid: str): | |
async with BleakClient(address) as client: | |
EMG_NUM.Connect=client.is_connected | |
EMG_NUM.sudden=False | |
# 开启客户端通知 | |
await client.start_notify(char_uuid, callback_handler) | |
# 主程序未结束前一直循环,防止中断连接 | |
while EMG_NUM.Connect: | |
# 抛出中断异常并结束循环 | |
if client.is_connected: | |
EMG_NUM.sudden=False | |
else: | |
EMG_NUM.sudden=True | |
EMG_NUM.Connect=False | |
break | |
# 持续循环输出 | |
if EMG_NUM.Kill: | |
# 表示主程序终止,立刻终止本次循环 | |
break | |
else: | |
await asyncio.sleep(1.0) | |
def ble(): | |
# 主程序未退出时,循环寻找设备中 | |
# 同时判断未连接状态和主程序kill为true时跳出 | |
while EMG_NUM.Kill == False: | |
try: | |
logging.basicConfig(level=logging.INFO) | |
asyncio.run( | |
main( | |
"7C:9E:BD:48:6A:36", | |
"ca73b3ba-39f6-4ab3-91ae-186dc9577d99", | |
) | |
) | |
except: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment