Created
December 2, 2018 18:46
-
-
Save jowlo/361564c16e0e5108aee36f2f1360a96f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python | |
import subprocess | |
import RPi.GPIO as GPIO | |
import time | |
import argparse | |
import daemon | |
import socket | |
import json | |
class GPIOPin: | |
def __init__(self, label, pin, inverted=False, direction=GPIO.OUT): | |
self.label = label | |
self.pin = pin | |
self.inverted = inverted | |
self.direction = direction | |
GPIO.setup(self.pin, self.direction) | |
GPIO.output(self.pin, 0) | |
def on(self): | |
GPIO.output(self.pin, 0 if self.inverted else 1) | |
def off(self): | |
GPIO.output(self.pin, 1 if self.inverted else 0) | |
@property | |
def state(self): | |
if self.inverted: | |
return not GPIO.input(self.pin) | |
else: | |
return GPIO.input(self.pin) | |
def toggle(self): | |
GPIO.output(self.pin, not self.state) | |
def xtoggle(self, times, delay=.2): | |
for i in range(2*times): | |
self.toggle() | |
time.sleep(delay) | |
class Stereo: | |
def __init__(self, relay, dac): | |
self.relay = relay | |
self.dac = dac | |
def on(self): | |
self.relay.on() | |
time.sleep(.3) | |
self.dac.xtoggle(2, delay=.2) | |
subprocess.run(['irsend', 'SEND_ONCE', 'YamahaRAS5', 'line_1']) | |
def off(self): | |
self.relay.off() | |
@property | |
def state(self): | |
return self.relay.state | |
class AbstractOutput(): | |
def mute(self): | |
pass | |
def unmute(self): | |
pass | |
def set_volume(self, volume): | |
pass | |
class DummyOutput(AbstractOutput): | |
def __init__(self, name): | |
self.name = name | |
self.muted = False | |
def mute(self): | |
if not self.muted: | |
print("Client [", self.name, "] muted") | |
self.muted = True | |
def unmute(self): | |
if self.muted: | |
print("Client [", self.name, "] unmuted") | |
self.muted = False | |
def set_volume(self, volume): | |
print("Client [", self.name, "] volume set to:", volume) | |
class StereoOutput(AbstractOutput): | |
def __init__(self, name, stereo): | |
self.name = name | |
self.muted = False | |
self.stereo = stereo | |
def mute(self): | |
if not self.muted: | |
print("Client [", self.name, "] muted") | |
self.muted = True | |
self.stereo.off() | |
def unmute(self): | |
if self.muted: | |
print("Client [", self.name, "] unmuted") | |
self.muted = False | |
self.stereo.on() | |
def set_volume(self, volume): | |
print("Client [", self.name, "] volume set to:", volume) | |
class Listener: | |
def __init__(self, ip, port, outputs): | |
self.ip = ip | |
self.port = port | |
self._clients = {} | |
self.outputs = outputs | |
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self._socket.connect((self.ip, self.port)) | |
self.file = self._socket.makefile("rw") | |
self.get_clients() | |
def get_clients(self): | |
self.file.write('{"id":8,"jsonrpc":"2.0","method":"Server.GetStatus"} \n') | |
self.file.flush() | |
line = self.file.readline() | |
status = json.loads(line) | |
for group in status["result"]["server"]["groups"]: | |
for c in group["clients"]: | |
for output in self.outputs: | |
if c["host"]["name"] == output.name: | |
self._clients[c["id"]] = output | |
assert(self._clients) | |
def listen(self): | |
while True: | |
line = self.file.readline() | |
p = json.loads(line) | |
#print(json.dumps(p, indent=4, sort_keys=True)) | |
self.handle(p) | |
def handle(self, p ): | |
assert(p['jsonrpc'] == '2.0') | |
assert('method' in p) | |
if p['method'] == 'Client.OnVolumeChanged': | |
self.handle_volume_change(p["params"]["id"], p["params"]["volume"]); | |
def handle_volume_change(self, client_id, volume): | |
if client_id in self._clients: | |
if volume["muted"]: | |
self._clients[client_id].mute() | |
else: | |
self._clients[client_id].unmute() | |
self._clients[client_id].set_volume(volume["percent"]) | |
def start_listener(ip, port): | |
# Initialize Pin-Numering | |
GPIO.setmode(GPIO.BCM) | |
# Set pins | |
relay = GPIOPin('relay', 3, True) | |
dac = GPIOPin('dac', 17) | |
# Get more abstract | |
stereo = Stereo(relay, dac) | |
stereo_output = StereoOutput("mediabox", stereo) | |
s = Listener(ip, port, [stereo_output]) | |
s.listen() | |
def main(): | |
parser = argparse.ArgumentParser(description="Control Raspberry Pi GPIO outputs") | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('--listen', action='store_true') | |
group.add_argument('--mpd-led') | |
group.add_argument('--ssh-led') | |
group.add_argument('--toggle-dac', action='store_true') | |
parser.add_argument('--daemonize', action='store_true') | |
parser.add_argument('--ip', default='localhost') | |
parser.add_argument('--port', default=1705, type=int) | |
args = parser.parse_args() | |
# Control mpd led | |
if args.mpd_led: | |
# Initialize Pin-Numering | |
GPIO.setmode(GPIO.BCM) | |
mpd_led = GPIOPin('mpd_led', 23) | |
if args.mpd_led == 'on': | |
mpd_led.on() | |
elif args.mpd_led == 'off': | |
mpd_led.off() | |
# Control ssh led | |
if args.ssh_led: | |
# Initialize Pin-Numering | |
GPIO.setmode(GPIO.BCM) | |
ssh_led = GPIOPin('ssh_led', 22) | |
if args.ssh_led == 'on': | |
ssh_led.on() | |
elif args.ssh_led == 'off': | |
ssh_led.off() | |
if args.toggle_dac: | |
GPIO.setmode(GPIO.BCM) | |
dac = GPIOPin('dac', 17) | |
dac.xtoggle(1) | |
if args.listen: | |
if args.daemonize: | |
with daemon.DaemonContext(): | |
start_listener(args.ip, args.port) | |
else: | |
start_listener(args.ip, args.port) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment