Created
October 28, 2020 14:49
-
-
Save dmitrodem/9384e4f7d38b985939fc2982a0c80b10 to your computer and use it in GitHub Desktop.
Bluedio Headset control application
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import bluetooth | |
from enum import Enum | |
import threading | |
import time | |
import sys | |
import argparse | |
class BluedioPlayAuto(Enum): | |
UNKNOWN = 0x00 | |
OFF = 0x10 | |
ON = 0x11 | |
class BluedioSleepProtection(Enum): | |
UNKNOWN = 0x00 | |
OFF = 0x20 | |
ON = 0x21 | |
class BluedioTapTwice(Enum): | |
UNKNOWN = 0x00 | |
OFF = 0x30 | |
ON = 0x31 | |
class BluedioHintLanguage(Enum): | |
UNKNOWN = 0x00 | |
CHINESE = 0x90 | |
ENGLISH = 0x91 | |
SPANISH = 0x92 | |
FRENCH = 0x93 | |
class BluedioHeadset: | |
_play_auto = None | |
_sleep_protection = None | |
_tap_twice = None | |
_hint_language = None | |
_rcookie = None | |
_wcookie = None | |
@classmethod | |
def empty_packet(cls): | |
packet = bytearray(14) | |
packet[0] = 0xaa | |
packet[12] = 0x0b | |
packet[13] = 0xbb | |
return packet | |
def __init__(self, address, port = 7): | |
self.event = threading.Event() | |
self.event.clear() | |
self.thread = threading.Thread(name = "UpdThread", target = self._update_status, args = (address, port)) | |
self.thread.start() | |
def _update_status(self, address, port): | |
self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) | |
self.sock.connect((address, port)) | |
self.event.set() | |
while True: | |
try: | |
data = self.sock.recv(128) | |
except bluetooth.btcommon.BluetoothError: | |
return | |
if (len(data) != 14): | |
continue | |
if (data[0] != 0xaa): | |
continue | |
if (data[13] != 0xbb): | |
continue | |
self._rcookie = time.time() | |
self._play_auto = BluedioPlayAuto(data[1]) | |
self._sleep_protection = BluedioSleepProtection(data[2]) | |
self._tap_twice = BluedioTapTwice(data[3]) | |
self._hint_language = BluedioHintLanguage(data[9]) | |
self.event.set() | |
@classmethod | |
def on_off(cls, value): | |
if value in [True, 1, "ON", "on"]: | |
return "ON" | |
elif value in [False, 0, "OFF", "off"]: | |
return "OFF" | |
else: | |
return "UNKNOWN" | |
@property | |
def play_auto(self): | |
return self._play_auto | |
@play_auto.setter | |
def play_auto(self, value): | |
if value == None: | |
return | |
self.event.clear() | |
packet = BluedioHeadset.empty_packet() | |
if isinstance(value, BluedioPlayAuto): | |
packet[1] = value.value | |
else: | |
packet[1] = BluedioPlayAuto[self.on_off(value)].value | |
print(value) | |
self.sock.send(bytes(packet)) | |
self._wcookie = time.time() | |
@property | |
def sleep_protection(self): | |
return self._sleep_protection | |
@sleep_protection.setter | |
def sleep_protection(self, value): | |
if value == None: | |
return | |
self.event.clear() | |
packet = BluedioHeadset.empty_packet() | |
if isinstance(value, BluedioSleepProtection): | |
packet[2] = value.value | |
else: | |
packet[2] = BluedioSleepProtection[self.on_off(value)].value | |
self.sock.send(bytes(packet)) | |
self._wcookie = time.time() | |
@property | |
def tap_twice(self): | |
return self._tap_twice | |
@tap_twice.setter | |
def tap_twice(self, value): | |
if value == None: | |
return | |
self.event.clear() | |
packet = BluedioHeadset.empty_packet() | |
if isinstance(value, BluedioTapTwice): | |
packet[3] = value.value | |
else: | |
packet[3] = BluedioTapTwice[self.on_off(value)].value | |
self.sock.send(bytes(packet)) | |
self._wcookie = time.time() | |
@property | |
def hint_language(self): | |
return self._hint_language | |
@hint_language.setter | |
def hint_language(self, value): | |
if value == None: | |
return | |
self.event.clear() | |
packet = BluedioHeadset.empty_packet() | |
if isinstance(value, BluedioHintLanguage): | |
packet[9] = value.value | |
else: | |
try: | |
lang = BluedioHintLanguage[value.upper()] | |
except KeyError: | |
lang = BluedioHintLanguage.UNKNOWN | |
packet[9] = lang.value | |
self.sock.send(bytes(packet)) | |
self._wcookie = time.time() | |
def shutdown(self): | |
self.sock.close() | |
self.thread.join() | |
def __repr__(self): | |
if (self._rcookie): | |
v = [] | |
v.append(f"[{self._play_auto.name:>8}] Play Auto") | |
v.append(f"[{self._sleep_protection.name:>8}] Sleep Protection") | |
v.append(f"[{self._tap_twice.name:>8}] Tap Twice") | |
v.append(f"[{self._hint_language.name:>8}] Hint Language") | |
return "\n".join(v) | |
else: | |
return "" | |
class ActionNoYes(argparse.Action): | |
def __init__(self, option_strings, dest, default=None, required=False, help=None): | |
# if default is None: | |
# raise ValueError('You must provide a default with Yes/No action') | |
if len(option_strings) != 1: | |
print(option_strings) | |
raise ValueError('Only single argument is allowed with Yes/No action') | |
opt = option_strings[0] | |
if not opt.startswith('--'): | |
raise ValueError('Yes/No arguments must be prefixed with --') | |
opt = opt[2:] | |
opts = ['--' + opt, '--no-' + opt] | |
super(ActionNoYes, self).__init__(opts, dest, nargs=0, const=None, | |
default=default, required=required, help=help) | |
def __call__(self, parser, namespace, values, option_strings=None): | |
if option_strings.startswith('--no-'): | |
setattr(namespace, self.dest, False) | |
else: | |
setattr(namespace, self.dest, True) | |
def main(): | |
parser = argparse.ArgumentParser(description = "Control Bluedio Headset") | |
parser.add_argument("-a", "--address", nargs = 1, required = True, dest = "address", help = "Headset address") | |
parser.add_argument('--play-auto', dest = "play_auto", help = "Proximity sensor", action = ActionNoYes) | |
parser.add_argument("--sleep-protection", dest = "sleep_protection", help = "Auto-sleep", action = ActionNoYes) | |
parser.add_argument("--tap-twice", dest = "tap_twice", help = "Double-tap to answer", action = ActionNoYes) | |
parser.add_argument("--hint-language", nargs = 1, dest = "hint_language", | |
choices = [e.name.lower() for e in BluedioHintLanguage][1:], help = "Hint language") | |
args = parser.parse_args(sys.argv[1:]) | |
bluedio = BluedioHeadset(args.address[0]) | |
bluedio.event.wait() | |
bluedio.event.clear() | |
bluedio.play_auto = args.play_auto | |
bluedio.sleep_protection = args.sleep_protection | |
bluedio.tap_twice = args.tap_twice | |
if args.hint_language: | |
bluedio.hint_language = args.hint_language[0] | |
bluedio.event.wait() | |
print(bluedio) | |
bluedio.shutdown() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment