Skip to content

Instantly share code, notes, and snippets.

@aaugustin
Last active January 25, 2022 14:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaugustin/3b6f8f4ee11d08dbe4bc59432d9c8cf4 to your computer and use it in GitHub Desktop.
Save aaugustin/3b6f8f4ee11d08dbe4bc59432d9c8cf4 to your computer and use it in GitHub Desktop.
Hook a Griffin PowerMate to a Raspberry Pi running HifiBerry!
#!/usr/bin/env python3.8
"""
To enable this service, copy this file to /opt, then:
# chmod +x /opt/powermate.py
# pip3.8 install websockets
# cat > /etc/systemd/system/powermate.service
[Unit]
Description=PowerMate
Wants=beocreate2.service
After=beocreate2.service
[Service]
ExecStartPre=/opt/hifiberry/bin/bootmsg "Starting PowerMate"
ExecStart=/opt/powermate.py
StandardOutput=journal
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
# systemctl daemon-reload
# systemctl enable powermate
# systemctl start powermate
"""
import asyncio
import dataclasses
import glob
import json
import logging
import signal
import struct
import sys
import time
import websockets
# ~~~~~~~~ ~~~~~~~~ PowerMate connector ~~~~~~~~ ~~~~~~~~ #
@dataclasses.dataclass
class PowerMateEvent:
"""
PowerMate event read from or written to /dev/input/eventX.
"""
time_: float
type_: int
code: int
value: int
FORMAT = "nnHHl"
SIZE = struct.calcsize(FORMAT)
@classmethod
def parse(cls, data):
# See https://www.kernel.org/doc/Documentation/input/input.txt.
#
# > You'll always get a whole number of input events on a read.
# > Their layout is:
# >
# > struct input_event {
# > struct timeval time;
# > unsigned short type;
# > unsigned short code;
# > unsigned int value;
# > };
tv_sec, tv_usec, type_, code, value = struct.unpack(cls.FORMAT, data)
return cls(tv_sec + tv_usec / 1_000_000, type_, code, value)
def serialize(self):
tv_sec = int(self.time_)
tv_usec = int((self.time_ - tv_sec) * 1_000_000)
return struct.pack(
self.FORMAT, tv_sec, tv_usec, self.type_, self.code, self.value
)
class PowerMate:
"""
Interface with a PowerMate.
"""
LOGGER = logging.getLogger("powermate")
DEV_INPUT = "/dev/input/by-id/*PowerMate*"
# From uapi/linux/input-event-codes.h
EV_SYN = 0x00
EV_KEY = 0x01
EV_REL = 0x02
EV_MSC = 0x04
SYN_REPORT = 0
BTN_MISC = 0x100
REL_DIAL = 0x07
MSC_PULSELED = 0x01
def __init__(self, handle_rotate=None, handle_button=None):
"""
Configure a PowerMate interface.
``handle_rotate`` is a callback for rotate events. It receives the
event value, usually +1 / -1 for right / left respectively, but up to
+7 / -7 when rotating fast, and the event timestamp.
``handle_button`` is a callback for button presses. It receives the
event value, ``True`` / ``False`` for down / up respectively, and the
event timestamp.
If you need to track state, for example to detect double clicks, you
can make ``handle_rotate`` and ``handle_button`` methods of a class.
"""
self.device = None
self.handle_rotate = handle_rotate
self.handle_button = handle_button
async def read_event(self):
"""
Read a single event from device.
"""
data = await asyncio.get_event_loop().run_in_executor(
None, self.device.read, PowerMateEvent.SIZE,
)
assert len(data) == PowerMateEvent.SIZE, "incomplete read"
event = PowerMateEvent.parse(data)
self.LOGGER.debug("read event %r", event)
return event
async def write_event(self, type_, code, value):
"""
Write a single event to device.
"""
event = PowerMateEvent(time.time(), type_, code, value)
self.LOGGER.debug("write event %r", event)
data = event.serialize()
bytes_written = await asyncio.get_event_loop().run_in_executor(
None, self.device.write, data,
)
assert bytes_written == len(data), "incomplete write"
async def handle_event(self, event):
"""
Process a single event read from device.
"""
if event.type_ == self.EV_SYN:
assert event.code == self.SYN_REPORT
elif event.type_ == self.EV_KEY:
assert event.code == self.BTN_MISC
down = bool(event.value)
self.LOGGER.debug("button %s", "down" if down else "up")
if self.handle_button is not None:
await self.handle_button(down, event.time_)
elif event.type_ == self.EV_REL:
assert event.code == self.REL_DIAL
rotation = event.value
self.LOGGER.debug(
"rotate %d %s", abs(rotation), "right" if rotation > 0 else "left"
)
if self.handle_rotate is not None:
await self.handle_rotate(rotation, event.time_)
elif event.type_ == self.EV_MSC:
assert event.code == self.MSC_PULSELED
else:
self.LOGGER.error("unsupported event type %02x", event.type_)
async def run(self, input_=None):
"""
Process stream of events received from PowerMate.
This function calls``handle_rotate`` and ``handle_button`` as events
are received.
If ``input_`` is provided, it must be a file name under ``/dev/input``
corresponding to a PowerMate. If it isn't provided and a single
PowerMate is attached to the system, connect to that PowerMate.
"""
if input_ is None:
inputs = glob.glob(self.DEV_INPUT)
if len(inputs) != 1:
raise RuntimeError("zero or several PowerMate found")
input_ = inputs[0]
self.LOGGER.debug("reading events from %s", input_)
# While it could be safer to check in /proc/bus/input/handlers that
# we're connecting to a PowerMate, or to use the EVIOCGNAME ioctl,
# this isn't really useful in practice.
with open(input_, mode="r+b", buffering=0) as self.device:
try:
while True:
event = await self.read_event()
await self.handle_event(event)
finally:
# Send a dummy event; it will be echoed; this will unblock the
# thread that does the blocking read. This hack is necessary
# because there's no way to kill a thread blocked on I/O.
await self.write_event(self.EV_SYN, self.SYN_REPORT, 0)
async def set_led(
self,
brightness=1,
speed=1,
pulse_table=0,
pulse_asleep=False,
pulse_awake=False,
):
"""
Configure LED.
``brightness`` is a float between 0 and 1.
``speed`` is a float between 0 and 2, where 1 is the standard speed,
and only values really close to 1 are usable.
``pulse_table`` is 0, 1, or 2. It sets the type of pulse.
``pulse_asleep`` and ``pulse_awake`` set whether the PowerMate pulses
when the computer is asleep or awake respctively.
"""
self.LOGGER.debug(
"set LED brightness=%.3f speed=%.3f "
"pulse_table=%d pulse_asleep=%s pulse_awake=%s",
brightness,
speed,
pulse_table,
pulse_asleep,
pulse_awake,
)
brightness = int(brightness * 255)
assert 0 <= brightness <= 255
speed = int(speed * 255)
assert 0 <= speed <= 510
pulse_table = int(pulse_table)
assert 0 <= pulse_table <= 2
pulse_asleep = int(pulse_asleep)
assert 0 <= pulse_asleep <= 1
pulse_awake = int(pulse_awake)
assert 0 <= pulse_awake <= 1
value = (
brightness
| speed << 8
| pulse_table << 17
| pulse_asleep << 19
| pulse_awake << 20
)
await self.write_event(self.EV_MSC, self.MSC_PULSELED, value)
# ~~~~~~~~ ~~~~~~~~ Beocreate 2 connector ~~~~~~~~ ~~~~~~~~ #
@dataclasses.dataclass
class BeocreateEvent:
"""
Beocreate 2 server event read from or written to WebSocket connection.
"""
target: str
header: str
content: object = None
@classmethod
def parse(cls, msg):
data = json.loads(msg)
return cls(data["target"], data["header"], data.get("content"))
def serialize(self):
data = dataclasses.asdict(self)
if data["content"] is None:
del data["content"]
return json.dumps(data)
class Beocreate:
"""
Interface with a Beocreate 2 server.
"""
LOGGER = logging.getLogger("beocreate")
BEO_WS = "ws://localhost/"
def __init__(self, handle_volume=None, handle_playpause=None):
"""
Configure a Beocreate 2 server interface.
``handle_volume`` is a callback for volume events. It receives the
volume as a value between 0 and 100.
``handle_playpause`` is a callback for button presses. It receives the
play or pause status as``True`` or ``False`` respectively.
"""
self.websocket = None
self.volume = None
self.handle_volume = handle_volume
self.playing = None
self.handle_playpause = handle_playpause
async def handle_event(self, event):
"""
Process a single event read from Beocreate2 server.
"""
if event.target == "sound":
if event.header == "systemVolume":
self.volume = event.content["volume"]
self.LOGGER.debug("volume %d", self.volume)
if self.handle_volume is not None:
await self.handle_volume(self.volume)
elif event.target == "sources":
if event.header == "sources":
try:
source_name = event.content["focusedSource"]
source = event.content["sources"][source_name]
self.playing = source["playerState"] == "playing"
except KeyError:
self.playing = False
self.LOGGER.debug(
"player %s", "playing" if self.playing else "stopped"
)
if self.handle_playpause is not None:
await self.handle_playpause(self.playing)
async def run(self, beo_ws=None):
"""
Process stream of events received from Beocreate 2 server.
This function calls``handle_volume`` and ``handle_playpause`` as events
are received.
If ``beo_ws`` is provided, it is the address of the WebSocket endpoint
of the Beocreate 2 server.
"""
if beo_ws is None:
beo_ws = self.BEO_WS
self.LOGGER.debug("reading events from %s", beo_ws)
async with websockets.connect(
beo_ws, subprotocols=["beocreate"],
) as self.websocket:
assert self.websocket.subprotocol == "beocreate"
await self.send_event("sound", "getVolume")
await self.send_event("sources", "getSources")
async for msg in self.websocket:
event = BeocreateEvent.parse(msg)
self.LOGGER.debug("read event %r", event)
await self.handle_event(event)
self.websocket = None
async def send_event(self, target, header, content=None):
"""
Send an event to Beocreate 2 server.
"""
assert self.websocket is not None, "send_event() outside of run()"
event = BeocreateEvent(target, header, content)
self.LOGGER.debug("write event %r", event)
await self.websocket.send(event.serialize())
async def set_volume(self, volume):
"""
Set volume.
"""
self.LOGGER.debug("set volume to %d", volume)
await self.send_event("sound", "setVolume", volume)
async def toggle_playing(self):
"""
Toggle between play and pause.
"""
self.LOGGER.debug("toggle playing")
content = {"action": "playPause"}
await self.send_event("now-playing", "transport", content)
# ~~~~~~~~ ~~~~~~~~ Main program ~~~~~~~~ ~~~~~~~~ #
async def main(input_=None, beo_ws=None):
volume = 0
playing = False
async def handle_rotate(rotation, timestamp):
nonlocal beocreate, volume
volume += rotation
await beocreate.set_volume(volume)
await set_led()
async def handle_button(down, timestamp):
nonlocal beocreate, playing
if down:
playing = not playing
await beocreate.toggle_playing()
await set_led()
async def set_led():
nonlocal powermate, volume, playing
await powermate.set_led(volume / 100 if playing else 0.1)
async def handle_volume(new_volume):
nonlocal volume
if new_volume != volume:
volume = new_volume
await set_led()
async def handle_playpause(new_playing):
nonlocal playing
if new_playing != playing:
playing = new_playing
await set_led()
loop = asyncio.get_event_loop()
powermate = PowerMate(handle_rotate, handle_button)
powermate_task = loop.create_task(powermate.run())
beocreate = Beocreate(handle_volume, handle_playpause)
beocreate_task = loop.create_task(beocreate.run())
stop = loop.create_future()
loop.add_signal_handler(signal.SIGINT, stop.set_result, None)
loop.add_signal_handler(signal.SIGTERM, stop.set_result, None)
await asyncio.wait(
[powermate_task, beocreate_task, stop], return_when=asyncio.FIRST_COMPLETED,
)
beocreate_task.cancel()
try:
await beocreate_task
except asyncio.CancelledError:
pass
else:
logging.exception("beocreate connector failed")
powermate_task.cancel()
try:
await powermate_task
except asyncio.CancelledError:
pass
else:
logging.exception("powermate connector failed")
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("websockets").setLevel(logging.INFO)
if all(
[
len(sys.argv) < 2 or sys.argv[1].startswith("/"),
len(sys.argv) < 3 or sys.argv[2].startswith("ws"),
len(sys.argv) < 4,
]
):
asyncio.run(main(*sys.argv[1:]))
else:
sys.stderr.write(
f"usage: {sys.argv[0]} [/dev/input/eventX] [ws://beocreate2/]\n"
)
sys.exit(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment