Skip to content

Instantly share code, notes, and snippets.

@allrobot
Created March 29, 2022 14:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save allrobot/312ece60434a537cd3cad16a4110b9d4 to your computer and use it in GitHub Desktop.
Save allrobot/312ece60434a537cd3cad16a4110b9d4 to your computer and use it in GitHub Desktop.
import sys
import time
import asyncio
import logging
from bleak import BleakClient
logger = logging.getLogger()
# 运行BLE客户端
async def run_ble_client(address: str, char_uuid: str, queue: asyncio.Queue):
# 将一个目标数据放入队列中。如果队列已满,则在添加项之前等待,直到有空队列可用
async def callback_handler(sender, data):
await queue.put((time.time(), data))
async with BleakClient(address) as client:
# 打印BLE连接状态
logger.info(f"Connected: {client.is_connected}")
# 开启客户端通知
await client.start_notify(char_uuid, callback_handler)
while True:
await asyncio.sleep(3600.0)
# await client.stop_notify(char_uuid)
# Send an "exit command to the consumer"退出命令到外部设备
await queue.put((time.time(), None))
# 打印外部设备传来的信息队列
async def run_queue_consumer(queue: asyncio.Queue):
while True:
# Use await asyncio.wait_for(queue.get(), timeout=1.0) if you want a timeout for getting data.
# 如果你需要一个获取数据的超时时间,在asyncio.wait_for(queue.get(), timeout=1.0)设置timeout值
epoch, data = await queue.get()
# 如果未收到信息,输出
if data is None:
logger.info(
"Got message from client about disconnection. Exiting consumer loop..."
)
break
else:
print(str(data,"utf-8"))
async def main(address: str, char_uuid: str):
# 设置队列,用于协调生产者和消费者协程。
queue = asyncio.Queue()
# 创建客户端任务
client_task = run_ble_client(address, char_uuid, queue)
# 运行打印循环
consumer_task = run_queue_consumer(queue)
#
await asyncio.gather(client_task, consumer_task)
logger.info("Main method done.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(
main(
"7C:9E:BD:48:6A:36",
f"ca73b3ba-39f6-4ab3-91ae-186dc9577d99",
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment