Skip to content

Instantly share code, notes, and snippets.

@huwcbjones
Last active March 19, 2022 16:57
Show Gist options
  • Save huwcbjones/ebde008eeac4d8ea5bf038eebad992a3 to your computer and use it in GitHub Desktop.
Save huwcbjones/ebde008eeac4d8ea5bf038eebad992a3 to your computer and use it in GitHub Desktop.
Simple python script to indicate librespot status via the Raspberry Pi PWR/ACT LEDs
SUBSYSTEM=="leds", ACTION=="add", RUN+="/bin/chgrp -R leds /sys%p", RUN+="/bin/chmod -R g=u /sys%p"
SUBSYSTEM=="leds", ACTION=="change", ENV{TRIGGER}!="none", RUN+="/bin/chgrp -R leds /sys%p", RUN+="/bin/chmod -R g=u /sys%p"
  1. Install files
    1. Copy spotipi-server/spotipi-event to /usr/local/bin with permissions 0o755
    2. Copy spotipi.service to /lib/systemd/system
    3. Copy 99-onboard-leds.rules to /etc/udev/rules (this allows anyone with the leds group to control the LEDs.
  2. Configure the users/groups
    1. Add the leds group: sudo groupadd leds
    2. Add the spotipi user: sudo adduser --disabled-password --quiet --system --home /var/lib/spotipi --no-create-home --gecos "Spotipi" --group spotipi --add_extra_groups leds
  3. Enable spotipi service: systemctl enable spotipi.service spotipi.socket
  4. Edit /etc/default/raspotify to and add --onevent spotipi-event to OPTIONS
  5. Edit /lib/systemd/system/raspotify.service and append spotipi.service to the After=network.service section.
  6. Reboot
  7. Watch as the LEDs indicate what's happening!
    • Solid red: booting
    • Blinking red: no remote connection
    • Solid green: remote control connected/paused
    • Blinking green: music currently playing
    • Solid green + blinking red: changing track
#!/bin/bash
echo "${PLAYER_EVENT}" | socat UNIX-CONNECT:/run/spotipi/spotipi.sock -
#!/usr/bin/env python3
import asyncio
import logging
import os
import socket
from argparse import ArgumentParser
from enum import Enum
LOGGER = logging.getLogger("spotipi")
def get_led(led: str):
with open(f"/sys/class/leds/{led}/brightness", "r") as led_f:
return led_f.read().strip() == "1"
def set_led(led: str, state: bool):
with open(f"/sys/class/leds/{led}/brightness", "wb+", buffering=0) as led_f:
led_f.write(b"1" if state else b"0")
async def blink_led(led: str, time_on: float = 0.5, time_off: float = 0.5):
old_state = get_led(led)
try:
while True:
set_led(led, True)
await asyncio.sleep(time_on)
set_led(led, False)
await asyncio.sleep(time_off)
finally:
set_led(led, old_state)
class State(str, Enum):
"""State enum"""
STARTED = "started"
"""User connected"""
STOPPED = "stopped"
"""User disconnected"""
CHANGED = "changed"
"""Track changed"""
PLAYING = "playing"
"""Playback started"""
PAUSED = "paused"
"""Playback paused"""
@classmethod
def _missing_(cls, value):
if not isinstance(value, str):
return super()._missing_(value)
if value.strip() == value:
return super()._missing_(value)
return State(value.strip())
class SpotiPi:
def __init__(self, socket: str):
self._socket = socket
self._state = None
self._state_task = None
async def stopped(self):
set_led("ACT", False)
await blink_led("PWR", 0.2, 0.3)
async def started(self):
await self.paused()
async def paused(self):
set_led("PWR", False)
set_led("ACT", True)
async def playing(self):
set_led("PWR", False)
await blink_led("ACT")
async def changed(self):
set_led("ACT", True)
await blink_led("PWR", 0.1, 0.1)
async def _handle_connection(self, reader, _writer):
while not reader.at_eof():
data = (await reader.readline()).strip()
if not data:
continue
LOGGER.debug("Got data: %s", data)
try:
state = State(data.decode())
except ValueError:
continue
self._set_state(state)
def _set_state(self, state: State):
if self._state == state:
return
if self._state_task is not None and not self._state_task.done():
self._state_task.cancel()
LOGGER.info("Change state to: %s", state.name)
state_task = getattr(self, state.name.lower(), None)
if state_task is None:
return
self._state_task = asyncio.create_task(state_task())
async def listen(self):
args = {}
if isinstance(self._socket, int):
args["sock"] = socket.fromfd(self._socket, socket.AF_UNIX, socket.SOCK_STREAM)
else:
args["path"] = self._socket
server = await asyncio.start_unix_server(
self._handle_connection, **args,
)
LOGGER.info("Serving on %s", server.sockets[0].getsockname())
self._set_state(State.STOPPED)
async with server:
await server.serve_forever()
def main():
parser = ArgumentParser()
parser.add_argument("-v", action="store_true", default=False, help="Enable verbose logging (default: %(default)s)")
parser.add_argument("SOCKET", nargs="?", default="/run/spotipi/spotipi.sock", help="Socket to listen on (default: %(default)s)")
options = parser.parse_args()
level = logging.INFO
if options.v:
level = logging.DEBUG
logging.basicConfig(format="%(asctime)s[%(levelname)-8s][%(name)s]: %(message)s", level=level)
listen_pid = int(os.environ.get("LISTEN_PID", 0))
listen_fd = int(os.environ.get("LISTEN_FDS", 0))
if listen_pid == os.getpid() and listen_fd != 0:
socket = 3 + (listen_fd - 1)
else:
socket = options.SOCKET
try:
asyncio.run(SpotiPi(socket).listen())
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
[Unit]
Description=Spotipi Daemon
Requires=spotipi.socket
StartLimitIntervalSec=120
StartLimitBurst=5
[Service]
Type=simple
Restart=always
RestartSec=10
RemainAfterExit=no
ExecStart=/usr/local/bin/spotipi-server
User=spotipi
Group=spotipi
ProtectSystem=full
PrivateTmp=true
ProtectControlGroups=true
SystemCallFilter=@system-service
SystemCallArchitectures=native
NoNewPrivileges=true
ProtectKernelModules=true
RestrictAddressFamilies=AF_UNIX
RestrictNamespaces=true
MemoryDenyWriteExecute=true
RestrictRealtime=true
[Install]
WantedBy=sockets.target
[Unit]
Description=Spotipi Server Socket
[Socket]
ListenStream=/run/spotipi.sock
# Our service won't need permissions for the socket, since it
# inherits the file descriptor by socket activation
# only the librespot user will need access to the socket
SocketUser=raspotify
SocketMode=600
[Install]
WantedBy=sockets.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment