Created
February 21, 2023 03:04
-
-
Save arpruss/19168c0a6abf46ae442ab1ace52cab9b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import struct | |
import logging | |
import math | |
from pynput.keyboard import Controller,Key | |
keyboard = Controller() # Create the controller | |
from bleak import BleakClient,BleakScanner | |
import bleak.backends.winrt.client | |
from bleak.backends.winrt.service import BleakGATTServiceWinRT | |
from bleak.backends.service import BleakGATTServiceCollection | |
logging.basicConfig(level=logging.DEBUG) | |
def uuid16(n): | |
return "0000%04X-0000-1000-8000-00805F9B34FB" % n | |
ACCEL_SERVICE = "b9590f4e-f0c4-46cc-8c4f-096fec764f91" | |
GRAVITY_MEASUREMENT = "a0e83db5-08eb-44c2-a493-5e5f3dfce286" | |
MIN_ANGLE = 6 | |
dx=None | |
dy=None | |
def gravity(values): | |
global dx,dy | |
horiz = math.sqrt(values[0]**2+values[1]**2) | |
if horiz==0 and values[2]==0: | |
return | |
altitude = math.atan2(values[2],horiz) * 180/math.pi | |
if horiz!=0: | |
azimuth = -math.atan2(values[1],-values[0]) * 180/math.pi | |
if azimuth < 0: | |
azimuth += 360 | |
else: | |
azimuth = 0 | |
newDX = None | |
newDY = None | |
if altitude < 90-MIN_ANGLE: | |
if 45 <= azimuth <= 180-45: | |
newDY = Key.up | |
elif 180+45 <= azimuth <= 360-45: | |
newDY = Key.down | |
if azimuth <= 45 or azimuth >= 360-45: | |
newDX = Key.right | |
elif 90+45 <= azimuth <= 180+45: | |
newDX = Key.left | |
if newDX != dx: | |
if dx: | |
print("rel",dx) | |
keyboard.release(dx) | |
if newDX: | |
keyboard.press(newDX) | |
print(newDX) | |
if newDY != dy: | |
if dy: | |
print("rel",dy) | |
keyboard.release(dy) | |
if newDY: | |
keyboard.press(newDY) | |
print(newDY) | |
dx,dy=newDX,newDY | |
async def run(address, debug=False): | |
async with BleakClient(address) as client: | |
logging.info("connecting") | |
connected = await client.is_connected() | |
print("Connected: {0}".format(connected)) | |
def accel_handler(sender, data): | |
values = struct.unpack('fff', data) | |
gravity(values) | |
await client.start_notify(GRAVITY_MEASUREMENT, accel_handler) | |
while await client.is_connected(): | |
await asyncio.sleep(1) | |
async def scan(debug=False): | |
global foundDevice | |
stop_event = asyncio.Event() | |
foundDevice = None | |
# TODO: add something that calls stop_event.set() | |
def callback(device, advertising_data): | |
global foundDevice | |
foundDevice = device | |
stop_event.set() | |
async with BleakScanner(callback,service_uuids=[ACCEL_SERVICE]) as scanner: | |
await stop_event.wait() | |
if foundDevice: | |
print("Found device", foundDevice) | |
await run(foundDevice) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(scan(debug=True)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment