Skip to content

Instantly share code, notes, and snippets.

@lad1337
Created April 11, 2016 18:01
Show Gist options
  • Save lad1337/5838561bc4c34cd5fe420c4e0c8e16c4 to your computer and use it in GitHub Desktop.
Save lad1337/5838561bc4c34cd5fe420c4e0c8e16c4 to your computer and use it in GitHub Desktop.
simple cec client on top of the cec lib
import cec
class PowerStatus():
def __init__(self, power_on):
self.power_on = power_on
def __str__(self):
if self.power_on:
return 'on'
return 'standby'
def __repr__(self):
return '< Power: %s >' % self
def __bool__(self):
return self.power_on
import logging
class CECClient():
def __init__(self, osd_name=None, device_types=None, init=True):
self.logger = logging.getLogger('CECClient')
self.logger_bus = logging.getLogger('CECClient.bus')
self.cecconfig = cec.libcec_configuration()
self.cecconfig.strDeviceName = osd_name or "py cec client"
self.cecconfig.bActivateSource = 0
self.cecconfig.clientVersion = cec.LIBCEC_VERSION_CURRENT
self.cecconfig.SetLogCallback(self.log_callback)
if device_types:
for device_type in device_types:
self.cecconfig.deviceTypes.Add(device_type)
else:
self.cecconfig.deviceTypes.Add(cec.CEC_DEVICE_TYPE_PLAYBACK_DEVICE)
self.connection = cec.ICECAdapter.Create(self.cecconfig)
self.connected = False
self._logical_address = None
self._devices = None
if init:
self.init()
def init(self):
self._logical_address = None
adapters = self.connection.DetectAdapters()
if not adapters:
raise ConnectionError('No Adapters found')
self.connected = self.connection.Open(adapters[0].strComName)
def log_callback(self, level, time, message):
own_log_level = self.logger.getLogLevel()
log_level_map = {
cec.CEC_LOG_ERROR: logging.ERROR,
cec.CEC_LOG_WARNING: logging.WARNING,
cec.CEC_LOG_NOTICE: logging.INFO,
cec.CEC_LOG_TRAFFIC: logging.DEBUG,
cec.CEC_LOG_DEBUG: logging.DEBUG,
}
self.logger_bus.log(log_level_map[level], message)
@property
def logical_address(self):
if self._logical_address is None:
self._logical_address = self.connection.GetLogicalAddresses().primary
return self._logical_address
@property
def devices(self):
if self._devices is None:
self._devices = self.scan()
return self._devices
def raw_command(self, data):
self.logger.debug('Sending command: %s', data)
cmd = self.connection.CommandFromString(data)
return self.connection.Transmit(cmd)
def scan(self):
addresses = self.connection.GetActiveDevices()
activeSource = self.connection.GetActiveSource()
x = 0
devices = {}
while x < 15:
if addresses.IsSet(x):
devices[x] = {
'vendor_id': self.connection.GetDeviceVendorId(x),
'phisical_address': int(str(self.connection.GetDevicePhysicalAddress(x))),
'logical_address': x,
'active': self.connection.IsActiveSource(x),
'cec_version': self.connection.GetDeviceCecVersion(x),
'power_status': PowerStatus(self.connection.GetDevicePowerStatus(x) == 0),
'osd_name': self.connection.GetDeviceOSDName(x)
}
x += 1
self._devices = devices
return devices
def button_release(self, dst, src=None):
src = src or self.logical_address
return self.raw_command('{src:x}{dst:x}:45'.format(src=src, dst=dst))
def _button_press(self, button, dst, release, src=None):
src = src or self.logical_address
result_press = self.raw_command(
'{src:x}{dst:x}:{button}:00'.format(src=src, dst=dst, button=button))
result_release = True
if release:
result_release = self.button_release(dst)
return result_press and result_release
def button_menu(self, dst, release=True):
return self._button_press("45", dst, release)
def button_select(self, dst, release=True):
return self._button_press("44", dst, release)
def all_standby(self, src=None, dst=None):
src = src or self.logical_address
dst = dst or 0
return self.raw_command('{src:x}{dst:x}:36'.format(src=src, dst=dst))
def active_source(self, logical_address=None, phisical_address=None):
if logical_address is None and phisical_address is None:
raise ValueError('no logical_address nor phisical_address given')
target = phisical_address or self.devices[logical_address]['phisical_address']
target = '{:04x}'.format(target)
target = target[0:1] + ':' + target[2:3]
return self.raw_command('{src}F:82:{target}'.format(src=self.logical_address, target=target))
# do stuff
logging.basicConfig()
c = CECClient()
c.logger.setLevel(logging.DEBUG)
print(c.logical_address)
print(c.scan())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment