Skip to content

Instantly share code, notes, and snippets.

@arpruss
Created February 21, 2023 03:04
Show Gist options
  • Save arpruss/19168c0a6abf46ae442ab1ace52cab9b to your computer and use it in GitHub Desktop.
Save arpruss/19168c0a6abf46ae442ab1ace52cab9b to your computer and use it in GitHub Desktop.
import asyncio
import struct
import logging
import math
from pynput.keyboard import Controller,Key
keyboard = Controller() # Create the controller
from bleak import BleakClient,BleakScanner
import bleak.backends.winrt.client
from bleak.backends.winrt.service import BleakGATTServiceWinRT
from bleak.backends.service import BleakGATTServiceCollection
logging.basicConfig(level=logging.DEBUG)
def uuid16(n):
return "0000%04X-0000-1000-8000-00805F9B34FB" % n
ACCEL_SERVICE = "b9590f4e-f0c4-46cc-8c4f-096fec764f91"
GRAVITY_MEASUREMENT = "a0e83db5-08eb-44c2-a493-5e5f3dfce286"
MIN_ANGLE = 6
dx=None
dy=None
def gravity(values):
global dx,dy
horiz = math.sqrt(values[0]**2+values[1]**2)
if horiz==0 and values[2]==0:
return
altitude = math.atan2(values[2],horiz) * 180/math.pi
if horiz!=0:
azimuth = -math.atan2(values[1],-values[0]) * 180/math.pi
if azimuth < 0:
azimuth += 360
else:
azimuth = 0
newDX = None
newDY = None
if altitude < 90-MIN_ANGLE:
if 45 <= azimuth <= 180-45:
newDY = Key.up
elif 180+45 <= azimuth <= 360-45:
newDY = Key.down
if azimuth <= 45 or azimuth >= 360-45:
newDX = Key.right
elif 90+45 <= azimuth <= 180+45:
newDX = Key.left
if newDX != dx:
if dx:
print("rel",dx)
keyboard.release(dx)
if newDX:
keyboard.press(newDX)
print(newDX)
if newDY != dy:
if dy:
print("rel",dy)
keyboard.release(dy)
if newDY:
keyboard.press(newDY)
print(newDY)
dx,dy=newDX,newDY
async def run(address, debug=False):
async with BleakClient(address) as client:
logging.info("connecting")
connected = await client.is_connected()
print("Connected: {0}".format(connected))
def accel_handler(sender, data):
values = struct.unpack('fff', data)
gravity(values)
await client.start_notify(GRAVITY_MEASUREMENT, accel_handler)
while await client.is_connected():
await asyncio.sleep(1)
async def scan(debug=False):
global foundDevice
stop_event = asyncio.Event()
foundDevice = None
# TODO: add something that calls stop_event.set()
def callback(device, advertising_data):
global foundDevice
foundDevice = device
stop_event.set()
async with BleakScanner(callback,service_uuids=[ACCEL_SERVICE]) as scanner:
await stop_event.wait()
if foundDevice:
print("Found device", foundDevice)
await run(foundDevice)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(scan(debug=True))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment