Skip to content

Instantly share code, notes, and snippets.

@jowlo
Created December 2, 2018 18:46
Show Gist options
  • Save jowlo/361564c16e0e5108aee36f2f1360a96f to your computer and use it in GitHub Desktop.
Save jowlo/361564c16e0e5108aee36f2f1360a96f to your computer and use it in GitHub Desktop.
#!/bin/python
import subprocess
import RPi.GPIO as GPIO
import time
import argparse
import daemon
import socket
import json
class GPIOPin:
def __init__(self, label, pin, inverted=False, direction=GPIO.OUT):
self.label = label
self.pin = pin
self.inverted = inverted
self.direction = direction
GPIO.setup(self.pin, self.direction)
GPIO.output(self.pin, 0)
def on(self):
GPIO.output(self.pin, 0 if self.inverted else 1)
def off(self):
GPIO.output(self.pin, 1 if self.inverted else 0)
@property
def state(self):
if self.inverted:
return not GPIO.input(self.pin)
else:
return GPIO.input(self.pin)
def toggle(self):
GPIO.output(self.pin, not self.state)
def xtoggle(self, times, delay=.2):
for i in range(2*times):
self.toggle()
time.sleep(delay)
class Stereo:
def __init__(self, relay, dac):
self.relay = relay
self.dac = dac
def on(self):
self.relay.on()
time.sleep(.3)
self.dac.xtoggle(2, delay=.2)
subprocess.run(['irsend', 'SEND_ONCE', 'YamahaRAS5', 'line_1'])
def off(self):
self.relay.off()
@property
def state(self):
return self.relay.state
class AbstractOutput():
def mute(self):
pass
def unmute(self):
pass
def set_volume(self, volume):
pass
class DummyOutput(AbstractOutput):
def __init__(self, name):
self.name = name
self.muted = False
def mute(self):
if not self.muted:
print("Client [", self.name, "] muted")
self.muted = True
def unmute(self):
if self.muted:
print("Client [", self.name, "] unmuted")
self.muted = False
def set_volume(self, volume):
print("Client [", self.name, "] volume set to:", volume)
class StereoOutput(AbstractOutput):
def __init__(self, name, stereo):
self.name = name
self.muted = False
self.stereo = stereo
def mute(self):
if not self.muted:
print("Client [", self.name, "] muted")
self.muted = True
self.stereo.off()
def unmute(self):
if self.muted:
print("Client [", self.name, "] unmuted")
self.muted = False
self.stereo.on()
def set_volume(self, volume):
print("Client [", self.name, "] volume set to:", volume)
class Listener:
def __init__(self, ip, port, outputs):
self.ip = ip
self.port = port
self._clients = {}
self.outputs = outputs
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((self.ip, self.port))
self.file = self._socket.makefile("rw")
self.get_clients()
def get_clients(self):
self.file.write('{"id":8,"jsonrpc":"2.0","method":"Server.GetStatus"} \n')
self.file.flush()
line = self.file.readline()
status = json.loads(line)
for group in status["result"]["server"]["groups"]:
for c in group["clients"]:
for output in self.outputs:
if c["host"]["name"] == output.name:
self._clients[c["id"]] = output
assert(self._clients)
def listen(self):
while True:
line = self.file.readline()
p = json.loads(line)
#print(json.dumps(p, indent=4, sort_keys=True))
self.handle(p)
def handle(self, p ):
assert(p['jsonrpc'] == '2.0')
assert('method' in p)
if p['method'] == 'Client.OnVolumeChanged':
self.handle_volume_change(p["params"]["id"], p["params"]["volume"]);
def handle_volume_change(self, client_id, volume):
if client_id in self._clients:
if volume["muted"]:
self._clients[client_id].mute()
else:
self._clients[client_id].unmute()
self._clients[client_id].set_volume(volume["percent"])
def start_listener(ip, port):
# Initialize Pin-Numering
GPIO.setmode(GPIO.BCM)
# Set pins
relay = GPIOPin('relay', 3, True)
dac = GPIOPin('dac', 17)
# Get more abstract
stereo = Stereo(relay, dac)
stereo_output = StereoOutput("mediabox", stereo)
s = Listener(ip, port, [stereo_output])
s.listen()
def main():
parser = argparse.ArgumentParser(description="Control Raspberry Pi GPIO outputs")
group = parser.add_mutually_exclusive_group()
group.add_argument('--listen', action='store_true')
group.add_argument('--mpd-led')
group.add_argument('--ssh-led')
group.add_argument('--toggle-dac', action='store_true')
parser.add_argument('--daemonize', action='store_true')
parser.add_argument('--ip', default='localhost')
parser.add_argument('--port', default=1705, type=int)
args = parser.parse_args()
# Control mpd led
if args.mpd_led:
# Initialize Pin-Numering
GPIO.setmode(GPIO.BCM)
mpd_led = GPIOPin('mpd_led', 23)
if args.mpd_led == 'on':
mpd_led.on()
elif args.mpd_led == 'off':
mpd_led.off()
# Control ssh led
if args.ssh_led:
# Initialize Pin-Numering
GPIO.setmode(GPIO.BCM)
ssh_led = GPIOPin('ssh_led', 22)
if args.ssh_led == 'on':
ssh_led.on()
elif args.ssh_led == 'off':
ssh_led.off()
if args.toggle_dac:
GPIO.setmode(GPIO.BCM)
dac = GPIOPin('dac', 17)
dac.xtoggle(1)
if args.listen:
if args.daemonize:
with daemon.DaemonContext():
start_listener(args.ip, args.port)
else:
start_listener(args.ip, args.port)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment