Skip to content

Instantly share code, notes, and snippets.

@oogali
Last active August 29, 2015 14:04
Show Gist options
  • Save oogali/9877e9958e72112ad812 to your computer and use it in GitHub Desktop.
Save oogali/9877e9958e72112ad812 to your computer and use it in GitHub Desktop.
Yamaha HTTP Control PoC
#!/usr/bin/env python
import sys
import time
import curses
import signal
import requests
from lxml import etree
class YZone:
def __init__(self, doctree=None, name=None, receiver=None):
if receiver is None:
raise Exception('Expected parent receiver object')
if doctree is None and name is None:
raise Exception('Expected tree of document data or zone name')
self.receiver = receiver
if doctree is not None:
self.name = doctree.tag
self.doctree = doctree
else:
self.name = name
self.doctree = self.get_status()
def get_status(self):
req = self.receiver.build_request(verb='GET', zone=self.name,
command='GetParam',
target=['Basic_Status'])
response = self.receiver.query(req)
return list(response.xpath('/YAMAHA_AV')[0])[0]
def get_mute(self):
req = self.receiver.build_request(verb='GET', zone=self.name,
command='GetParam',
target=['Volume', 'Mute'])
response = self.receiver.query(req)
return response.xpath('/YAMAHA_AV/{}/Volume/Mute'.format(self.name))[0].text == 'On'
def set_mute(self, on=True):
req = self.receiver.build_request(verb='PUT', zone=self.name,
command='On' if on else 'Off',
target=['Volume', 'Mute'])
response = self.receiver.query(req)
return response
def toggle_mute(self):
new_status = not self.get_mute()
self.set_mute(new_status)
return new_status
def get_volume(self):
req = self.receiver.build_request(verb='GET', zone=self.name,
command='GetParam',
target=['Volume', 'Lvl'])
response = self.receiver.query(req)
return int(response.xpath('/YAMAHA_AV/{}/Volume/Lvl/Val'.format(self.name))[0].text)
def set_volume(self, level):
req = self.receiver.build_request(verb='PUT', zone=self.name,
command=str(level),
target=['Volume', 'Lvl', 'Val'],
args={'Exp': '1', 'Unit': 'dB'})
response = self.receiver.query(req)
return response
def adjust_volume(self, offset):
volume = self.get_volume()
new_volume = volume + int(offset)
self.set_volume(new_volume)
return new_volume
class YReceiver:
def __init__(self, address=None):
if address is None:
raise Exception('You must specify a receiver IP address or hostname')
self.address = address
self.url = 'http://{}/YamahaRemoteControl/ctrl'.format(address)
self.zones = []
# load our main zone
self.create_zone('Main_Zone')
def create_zone(self, name):
zone = YZone(name=name, receiver=self)
self.zones.append(zone)
def build_request(self, verb=None, zone=None, command=None, target=None, args=None):
if verb is None:
raise Exception('A verb must be provided')
root = etree.Element('YAMAHA_AV', cmd=verb)
doc = etree.ElementTree(root)
zone_element = etree.SubElement(root, zone)
target_element = etree.SubElement(zone_element, target[0])
i = 0
for t in target[1:]:
target_element = etree.SubElement(target_element, t)
if i == len(target[1:]) - 2:
parent_target = target_element
i += 1
target_element.text = command
if args is not None:
for k, v in args.items():
element = etree.SubElement(parent_target, k)
element.text = v
return etree.tostring(doc)
def query(self, req):
r = requests.post(self.url, data=req)
return etree.fromstring(r.text)
def catch_ctrl_c(signo, frame):
curses.endwin()
def main(argv=None):
if argv is None:
argv = sys.argv
if len(argv) < 2:
print '{} <receiver ip>'.format(argv[0])
return -1
rcvr = YReceiver(address=argv[1])
zone = rcvr.zones[0]
print zone.name
print '=' * len(zone.name)
print etree.tostring(zone.doctree, pretty_print=True)
time.sleep(4)
signal.signal(signal.SIGINT, catch_ctrl_c)
screen = curses.initscr()
curses.cbreak()
screen.keypad(1)
screen.addstr(0, 10, "Hit 'q' to quit")
screen.refresh()
key = None
while key != ord('q'):
key = screen.getch()
screen.addch(20, 25, key)
screen.refresh()
if key == curses.KEY_UP:
zone.adjust_volume(+5)
elif key == curses.KEY_DOWN:
zone.adjust_volume(-5)
elif key == ord('m'):
zone.toggle_mute()
curses.endwin()
return 0
if __name__ == '__main__':
sys.exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment