Skip to content

Instantly share code, notes, and snippets.

@prairiesnpr
Created March 31, 2019 16:15
Show Gist options
  • Save prairiesnpr/8dee06fafab4d4f5fa9da9f107d3cb75 to your computer and use it in GitHub Desktop.
Save prairiesnpr/8dee06fafab4d4f5fa9da9f107d3cb75 to your computer and use it in GitHub Desktop.
"""
Support for Onkyo Receivers.
For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/media_player.onkyo/
"""
import logging
# pylint: disable=unused-import
from typing import List # noqa: F401
import voluptuous as vol
from homeassistant.components.media_player import (
MediaPlayerDevice, PLATFORM_SCHEMA)
from homeassistant.components.media_player.const import (
SUPPORT_PLAY, SUPPORT_PLAY_MEDIA, SUPPORT_SELECT_SOURCE,
SUPPORT_TURN_OFF, SUPPORT_TURN_ON, SUPPORT_VOLUME_MUTE, SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP, DOMAIN)
from homeassistant.const import (
CONF_HOST, CONF_NAME, STATE_OFF, STATE_ON, ATTR_ENTITY_ID)
from homeassistant import data_entry_flow
import homeassistant.helpers.config_validation as cv
REQUIREMENTS = ['onkyo-eiscp==1.2.5', 'lxml==4.3.2']
_LOGGER = logging.getLogger(__name__)
CONF_SOURCES = 'sources'
CONF_MAX_VOLUME = 'max_volume'
DEVICE_MAX_VOLUME = 'device_max_volume'
DEFAULT_NAME = 'Onkyo Receiver'
SUPPORTED_MAX_VOLUME = 200
DEF_MAX_VOLUME = 80
MAX_VOL_OPTS = {"80":{"0":100,"1":80},
"82":{"0":200,"1":80}}
SUPPORT_ONKYO = SUPPORT_VOLUME_SET | SUPPORT_VOLUME_MUTE | \
SUPPORT_VOLUME_STEP | SUPPORT_TURN_ON | SUPPORT_TURN_OFF | \
SUPPORT_SELECT_SOURCE | SUPPORT_PLAY | SUPPORT_PLAY_MEDIA
SUPPORT_ONKYO_WO_VOLUME = SUPPORT_TURN_ON | SUPPORT_TURN_OFF | \
SUPPORT_SELECT_SOURCE | SUPPORT_PLAY | SUPPORT_PLAY_MEDIA
KNOWN_HOSTS = [] # type: List[str]
DEFAULT_SOURCES = {'tv': 'TV', 'bd': 'Bluray', 'game': 'Game', 'aux1': 'Aux1',
'video1': 'Video 1', 'video2': 'Video 2',
'video3': 'Video 3', 'video4': 'Video 4',
'video5': 'Video 5', 'video6': 'Video 6',
'video7': 'Video 7', 'fm': 'Radio'}
DEFAULT_PLAYABLE_SOURCES = ("fm", "am", "tuner")
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Optional(CONF_HOST): cv.string,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(DEVICE_MAX_VOLUME): cv.positive_int,
vol.Optional(CONF_MAX_VOLUME, default=DEF_MAX_VOLUME):
vol.All(vol.Coerce(int), vol.Range(min=1, max=SUPPORTED_MAX_VOLUME)),
vol.Optional(CONF_SOURCES, default=DEFAULT_SOURCES):
{cv.string: cv.string},
})
TIMEOUT_MESSAGE = 'Timeout waiting for response.'
ATTR_HDMI_OUTPUT = 'hdmi_output'
ACCEPTED_VALUES = ['no', 'analog', 'yes', 'out',
'out-sub', 'sub', 'hdbaset', 'both', 'up']
ONKYO_SELECT_OUTPUT_SCHEMA = vol.Schema({
vol.Required(ATTR_ENTITY_ID): cv.entity_ids,
vol.Required(ATTR_HDMI_OUTPUT): vol.In(ACCEPTED_VALUES)
})
SERVICE_SELECT_HDMI_OUTPUT = 'onkyo_select_hdmi_output'
def determine_max_volume(receiver):
"""Query the receiver to get the max supported volume"""
from lxml import etree
dev_info = receiver.raw('NRI')[3:]
xml = bytes(bytearray(dev_info, encoding = 'utf-8'))
doc = etree.XML(xml)
zone = [zone for zone in doc[0].find('zonelist').findall('zone') if int(zone.get('id')) == 1]
try:
return MAX_VOL_OPTS[zone[0].get('volmax')][zone[0].get('volstep')]
except KeyError:
return SUPPORTED_MAX_VOLUME
def determine_zones(receiver):
"""Determine what zones are available for the receiver."""
out = {
"zone2": False,
"zone3": False,
}
try:
_LOGGER.debug("Checking for zone 2 capability")
receiver.raw("ZPW")
out["zone2"] = True
except ValueError as error:
if str(error) != TIMEOUT_MESSAGE:
raise error
_LOGGER.debug("Zone 2 timed out, assuming no functionality")
try:
_LOGGER.debug("Checking for zone 3 capability")
receiver.raw("PW3")
out["zone3"] = True
except ValueError as error:
if str(error) != TIMEOUT_MESSAGE:
raise error
_LOGGER.debug("Zone 3 timed out, assuming no functionality")
return out
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the Onkyo platform."""
import eiscp
from eiscp import eISCP
host = config.get(CONF_HOST)
hosts = []
def service_handle(service):
"""Handle for services."""
entity_ids = service.data.get(ATTR_ENTITY_ID)
devices = [d for d in hosts if d.entity_id in entity_ids]
for device in devices:
if service.service == SERVICE_SELECT_HDMI_OUTPUT:
device.select_output(service.data.get(ATTR_HDMI_OUTPUT))
hass.services.register(
DOMAIN, SERVICE_SELECT_HDMI_OUTPUT, service_handle,
schema=ONKYO_SELECT_OUTPUT_SCHEMA)
if CONF_HOST in config and host not in KNOWN_HOSTS:
try:
receiver = eiscp.eISCP(host)
if DEVICE_MAX_VOLUME not in config:
max_volume = determine_max_volume(receiver)
else:
max_volume = SUPPORTED_MAX_VOLUME
hosts.append(OnkyoDevice(
receiver,
config.get(CONF_SOURCES),
name=config.get(CONF_NAME),
max_volume=max_volume,
supported_max_volume=max_volume,
))
KNOWN_HOSTS.append(host)
zones = determine_zones(receiver)
# Add Zone2 if available
if zones["zone2"]:
_LOGGER.debug("Setting up zone 2")
hosts.append(OnkyoDeviceZone(
"2", receiver,
config.get(CONF_SOURCES),
name="{} Zone 2".format(config[CONF_NAME]),
max_volume=max_volume,
supported_max_volume=max_volume,))
# Add Zone3 if available
if zones["zone3"]:
_LOGGER.debug("Setting up zone 3")
hosts.append(OnkyoDeviceZone(
"3", receiver,
config.get(CONF_SOURCES),
name="{} Zone 3".format(config[CONF_NAME]),
max_volume=max_volume,
supported_max_volume=max_volume,))
except OSError:
_LOGGER.error("Unable to connect to receiver at %s", host),
else:
for receiver in eISCP.discover():
if receiver.host not in KNOWN_HOSTS:
hosts.append(OnkyoDevice(receiver, config.get(CONF_SOURCES)))
KNOWN_HOSTS.append(receiver.host)
add_entities(hosts, True)
class OnkyoDevice(MediaPlayerDevice):
"""Representation of an Onkyo device."""
def __init__(self, receiver, sources, name=None,
max_volume=DEF_MAX_VOLUME,
supported_max_volume=DEF_MAX_VOLUME):
"""Initialize the Onkyo Receiver."""
self._receiver = receiver
self._muted = False
self._volume = 0
self._pwstate = STATE_OFF
self._name = name or '{}_{}'.format(
receiver.info['model_name'], receiver.info['identifier'])
self._max_volume = max_volume
self._supported_max_volume = supported_max_volume
self._current_source = None
self._source_list = list(sources.values())
self._source_mapping = sources
self._reverse_mapping = {value: key for key, value in sources.items()}
self._attributes = {}
def command(self, command):
"""Run an eiscp command and catch connection errors."""
try:
result = self._receiver.command(command)
except (ValueError, OSError, AttributeError, AssertionError):
if self._receiver.command_socket:
self._receiver.command_socket = None
_LOGGER.info("Resetting connection to %s", self._name)
else:
_LOGGER.info("%s is disconnected. Attempting to reconnect",
self._name)
return False
return result
def update(self):
"""Get the latest state from the device."""
status = self.command('system-power query')
if not status:
return
if status[1] == 'on':
self._pwstate = STATE_ON
else:
self._pwstate = STATE_OFF
return
volume_raw = self.command('volume query')
mute_raw = self.command('audio-muting query')
current_source_raw = self.command('input-selector query')
hdmi_out_raw = self.command('hdmi-output-selector query')
if not (volume_raw and mute_raw and current_source_raw):
return
# eiscp can return string or tuple. Make everything tuples.
if isinstance(current_source_raw[1], str):
current_source_tuples = \
(current_source_raw[0], (current_source_raw[1],))
else:
current_source_tuples = current_source_raw
for source in current_source_tuples[1]:
if source in self._source_mapping:
self._current_source = self._source_mapping[source]
break
else:
self._current_source = '_'.join(
[i for i in current_source_tuples[1]])
self._muted = bool(mute_raw[1] == 'on')
self._volume = volume_raw[1] / float(self._supported_max_volume)
if not hdmi_out_raw:
return
self._attributes["video_out"] = ','.join(hdmi_out_raw[1])
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def state(self):
"""Return the state of the device."""
return self._pwstate
@property
def volume_level(self):
"""Return the volume level of the media player (0..1)."""
return self._volume
@property
def is_volume_muted(self):
"""Return boolean indicating mute status."""
return self._muted
@property
def supported_features(self):
"""Return media player features that are supported."""
return SUPPORT_ONKYO
@property
def source(self):
"""Return the current input source of the device."""
return self._current_source
@property
def source_list(self):
"""List of available input sources."""
return self._source_list
@property
def device_state_attributes(self):
"""Return device specific state attributes."""
return self._attributes
def turn_off(self):
"""Turn the media player off."""
self.command('system-power standby')
def set_volume_level(self, volume):
"""
Set volume level, input is range 0..1.
Onkyo ranges from 1-(80,100,200) however this is usually far too loud
so allow the user to specify the upper range with CONF_MAX_VOLUME
"""
self.command('volume {}'.format(int(volume * self._max_volume)))
def volume_up(self):
"""Increase volume by 1 step."""
self.command('volume level-up')
def volume_down(self):
"""Decrease volume by 1 step."""
self.command('volume level-down')
def volume_up_1db(self):
"""Increase volume by 1db step."""
self.command('volume level-up-1db-step')
def volume_down_1db(self):
"""Decrease volume by 1db step."""
self.command('volume level-down-1db-step')
def mute_volume(self, mute):
"""Mute (true) or unmute (false) media player."""
if mute:
self.command('audio-muting on')
else:
self.command('audio-muting off')
def turn_on(self):
"""Turn the media player on."""
self.command('system-power on')
def select_source(self, source):
"""Set the input source."""
if source in self._source_list:
source = self._reverse_mapping[source]
self.command('input-selector {}'.format(source))
def play_media(self, media_type, media_id, **kwargs):
"""Play radio station by preset number."""
source = self._reverse_mapping[self._current_source]
if (media_type.lower() == 'radio' and
source in DEFAULT_PLAYABLE_SOURCES):
self.command('preset {}'.format(media_id))
def select_output(self, output):
"""Set hdmi-out."""
self.command('hdmi-output-selector={}'.format(output))
class OnkyoDeviceZone(OnkyoDevice):
"""Representation of an Onkyo device's extra zone."""
def __init__(self, zone, receiver, sources, name=None,
max_volume=DEF_MAX_VOLUME,
supported_max_volume=DEF_MAX_VOLUME,):
"""Initialize the Zone with the zone identifier."""
self._zone = zone
self._supports_volume = True
self._max_volume = max_volume
self._supported_max_volume = supported_max_volume
super(OnkyoDeviceZone, self).__init__(receiver, sources, name)
def update(self):
"""Get the latest state from the device."""
status = self.command('zone{}.power=query'.format(self._zone))
if not status:
return
if status[1] == 'on':
self._pwstate = STATE_ON
else:
self._pwstate = STATE_OFF
return
volume_raw = self.command('zone{}.volume=query'.format(self._zone))
mute_raw = self.command('zone{}.muting=query'.format(self._zone))
current_source_raw = self.command(
'zone{}.selector=query'.format(self._zone))
# If we received a source value, but not a volume value
# it's likely this zone permanently does not support volume.
if current_source_raw and not volume_raw:
self._supports_volume = False
if not (volume_raw and mute_raw and current_source_raw):
return
# It's possible for some players to have zones set to HDMI with
# no sound control. In this case, the string `N/A` is returned.
self._supports_volume = isinstance(volume_raw[1], (float, int))
# eiscp can return string or tuple. Make everything tuples.
if isinstance(current_source_raw[1], str):
current_source_tuples = \
(current_source_raw[0], (current_source_raw[1],))
else:
current_source_tuples = current_source_raw
for source in current_source_tuples[1]:
if source in self._source_mapping:
self._current_source = self._source_mapping[source]
break
else:
self._current_source = '_'.join(
[i for i in current_source_tuples[1]])
self._muted = bool(mute_raw[1] == 'on')
if self._supports_volume:
self._volume = volume_raw[1] / float(self._supported_max_volume)
@property
def supported_features(self):
"""Return media player features that are supported."""
if self._supports_volume:
return SUPPORT_ONKYO
return SUPPORT_ONKYO_WO_VOLUME
def turn_off(self):
"""Turn the media player off."""
self.command('zone{}.power=standby'.format(self._zone))
def set_volume_level(self, volume):
"""Set volume level, input is range 0..1. Onkyo ranges from 1-80."""
self.command('zone{}.volume={}'.format(self._zone, int(volume * self._max_volume)))
def volume_up(self):
"""Increase volume by 1 step."""
self.command('zone{}.volume=level-up'.format(self._zone))
def volume_down(self):
"""Decrease volume by 1 step."""
self.command('zone{}.volume=level-down'.format(self._zone))
def mute_volume(self, mute):
"""Mute (true) or unmute (false) media player."""
if mute:
self.command('zone{}.muting=on'.format(self._zone))
else:
self.command('zone{}.muting=off'.format(self._zone))
def turn_on(self):
"""Turn the media player on."""
self.command('zone{}.power=on'.format(self._zone))
def select_source(self, source):
"""Set the input source."""
if source in self._source_list:
source = self._reverse_mapping[source]
self.command('zone{}.selector={}'.format(self._zone, source))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment