Skip to content

Instantly share code, notes, and snippets.

@dmitrodem
Created October 28, 2020 14:49
Show Gist options
  • Save dmitrodem/9384e4f7d38b985939fc2982a0c80b10 to your computer and use it in GitHub Desktop.
Save dmitrodem/9384e4f7d38b985939fc2982a0c80b10 to your computer and use it in GitHub Desktop.
Bluedio Headset control application
#!/usr/bin/env python3
import bluetooth
from enum import Enum
import threading
import time
import sys
import argparse
class BluedioPlayAuto(Enum):
UNKNOWN = 0x00
OFF = 0x10
ON = 0x11
class BluedioSleepProtection(Enum):
UNKNOWN = 0x00
OFF = 0x20
ON = 0x21
class BluedioTapTwice(Enum):
UNKNOWN = 0x00
OFF = 0x30
ON = 0x31
class BluedioHintLanguage(Enum):
UNKNOWN = 0x00
CHINESE = 0x90
ENGLISH = 0x91
SPANISH = 0x92
FRENCH = 0x93
class BluedioHeadset:
_play_auto = None
_sleep_protection = None
_tap_twice = None
_hint_language = None
_rcookie = None
_wcookie = None
@classmethod
def empty_packet(cls):
packet = bytearray(14)
packet[0] = 0xaa
packet[12] = 0x0b
packet[13] = 0xbb
return packet
def __init__(self, address, port = 7):
self.event = threading.Event()
self.event.clear()
self.thread = threading.Thread(name = "UpdThread", target = self._update_status, args = (address, port))
self.thread.start()
def _update_status(self, address, port):
self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
self.sock.connect((address, port))
self.event.set()
while True:
try:
data = self.sock.recv(128)
except bluetooth.btcommon.BluetoothError:
return
if (len(data) != 14):
continue
if (data[0] != 0xaa):
continue
if (data[13] != 0xbb):
continue
self._rcookie = time.time()
self._play_auto = BluedioPlayAuto(data[1])
self._sleep_protection = BluedioSleepProtection(data[2])
self._tap_twice = BluedioTapTwice(data[3])
self._hint_language = BluedioHintLanguage(data[9])
self.event.set()
@classmethod
def on_off(cls, value):
if value in [True, 1, "ON", "on"]:
return "ON"
elif value in [False, 0, "OFF", "off"]:
return "OFF"
else:
return "UNKNOWN"
@property
def play_auto(self):
return self._play_auto
@play_auto.setter
def play_auto(self, value):
if value == None:
return
self.event.clear()
packet = BluedioHeadset.empty_packet()
if isinstance(value, BluedioPlayAuto):
packet[1] = value.value
else:
packet[1] = BluedioPlayAuto[self.on_off(value)].value
print(value)
self.sock.send(bytes(packet))
self._wcookie = time.time()
@property
def sleep_protection(self):
return self._sleep_protection
@sleep_protection.setter
def sleep_protection(self, value):
if value == None:
return
self.event.clear()
packet = BluedioHeadset.empty_packet()
if isinstance(value, BluedioSleepProtection):
packet[2] = value.value
else:
packet[2] = BluedioSleepProtection[self.on_off(value)].value
self.sock.send(bytes(packet))
self._wcookie = time.time()
@property
def tap_twice(self):
return self._tap_twice
@tap_twice.setter
def tap_twice(self, value):
if value == None:
return
self.event.clear()
packet = BluedioHeadset.empty_packet()
if isinstance(value, BluedioTapTwice):
packet[3] = value.value
else:
packet[3] = BluedioTapTwice[self.on_off(value)].value
self.sock.send(bytes(packet))
self._wcookie = time.time()
@property
def hint_language(self):
return self._hint_language
@hint_language.setter
def hint_language(self, value):
if value == None:
return
self.event.clear()
packet = BluedioHeadset.empty_packet()
if isinstance(value, BluedioHintLanguage):
packet[9] = value.value
else:
try:
lang = BluedioHintLanguage[value.upper()]
except KeyError:
lang = BluedioHintLanguage.UNKNOWN
packet[9] = lang.value
self.sock.send(bytes(packet))
self._wcookie = time.time()
def shutdown(self):
self.sock.close()
self.thread.join()
def __repr__(self):
if (self._rcookie):
v = []
v.append(f"[{self._play_auto.name:>8}] Play Auto")
v.append(f"[{self._sleep_protection.name:>8}] Sleep Protection")
v.append(f"[{self._tap_twice.name:>8}] Tap Twice")
v.append(f"[{self._hint_language.name:>8}] Hint Language")
return "\n".join(v)
else:
return ""
class ActionNoYes(argparse.Action):
def __init__(self, option_strings, dest, default=None, required=False, help=None):
# if default is None:
# raise ValueError('You must provide a default with Yes/No action')
if len(option_strings) != 1:
print(option_strings)
raise ValueError('Only single argument is allowed with Yes/No action')
opt = option_strings[0]
if not opt.startswith('--'):
raise ValueError('Yes/No arguments must be prefixed with --')
opt = opt[2:]
opts = ['--' + opt, '--no-' + opt]
super(ActionNoYes, self).__init__(opts, dest, nargs=0, const=None,
default=default, required=required, help=help)
def __call__(self, parser, namespace, values, option_strings=None):
if option_strings.startswith('--no-'):
setattr(namespace, self.dest, False)
else:
setattr(namespace, self.dest, True)
def main():
parser = argparse.ArgumentParser(description = "Control Bluedio Headset")
parser.add_argument("-a", "--address", nargs = 1, required = True, dest = "address", help = "Headset address")
parser.add_argument('--play-auto', dest = "play_auto", help = "Proximity sensor", action = ActionNoYes)
parser.add_argument("--sleep-protection", dest = "sleep_protection", help = "Auto-sleep", action = ActionNoYes)
parser.add_argument("--tap-twice", dest = "tap_twice", help = "Double-tap to answer", action = ActionNoYes)
parser.add_argument("--hint-language", nargs = 1, dest = "hint_language",
choices = [e.name.lower() for e in BluedioHintLanguage][1:], help = "Hint language")
args = parser.parse_args(sys.argv[1:])
bluedio = BluedioHeadset(args.address[0])
bluedio.event.wait()
bluedio.event.clear()
bluedio.play_auto = args.play_auto
bluedio.sleep_protection = args.sleep_protection
bluedio.tap_twice = args.tap_twice
if args.hint_language:
bluedio.hint_language = args.hint_language[0]
bluedio.event.wait()
print(bluedio)
bluedio.shutdown()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment