Skip to content

Instantly share code, notes, and snippets.

@olimpiurob
Created June 21, 2016 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save olimpiurob/673659c93f5750a92498397333ddc775 to your computer and use it in GitHub Desktop.
Save olimpiurob/673659c93f5750a92498397333ddc775 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
#
# Copyright 2014 Mikael Magnusson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# WIP Python module for Osram lightify
# Communicates with a gateway connected to the same LAN via TCP port 4000
# using a binary protocol
#
import binascii
import socket
import sys
import struct
import logging
__version__ = '1.0.2'
MODULE = __name__
PORT = 4000
COMMAND_ALL_LIGHT_STATUS = 0x13
COMMAND_GROUP_LIST = 0x1e
COMMAND_GROUP_INFO = 0x26
COMMAND_LUMINANCE = 0x31
COMMAND_ONOFF = 0x32
COMMAND_TEMP = 0x33
COMMAND_COLOUR = 0x36
COMMAND_LIGHT_STATUS = 0x68
# Commands
# 13 all light status (returns list of light address, light status, light name)
# 1e group list (returns list of group id, and group name)
# 26 group status (returns group id, group name, and list of light addresses)
# 31 set group luminance
# 32 set group onoff
# 33 set group temp
# 36 set group colour
# 68 light status (returns light address and light status (?))
class Luminary(object):
def __init__(self, conn, logger, name):
self.__logger = logger
self.__conn = conn
self.__name = name
def name(self):
return self.__name
def set_onoff(self, on):
data = self.__conn.build_onoff(self, on)
self.__conn.send(data)
self.__conn.recv()
def set_luminance(self, lum, time):
data = self.__conn.build_luminance(self, lum, time)
self.__conn.send(data)
self.__conn.recv()
def set_temperature(self, temp, time):
data = self.__conn.build_temp(self, temp, time)
self.__conn.send(data)
self.__conn.recv()
def set_rgb(self, r, g, b, time):
data = self.__conn.build_colour(self, r, g, b, time)
self.__conn.send(data)
self.__conn.recv()
class Light(Luminary):
def __init__(self, conn, logger, addr, name):
super(Light, self).__init__(conn, logger, name)
self.__logger = logger
self.__conn = conn
self.__addr = addr
def addr(self):
return self.__addr
def __str__(self):
return "<light: %s>" % self.name()
def update_status(self, on, lum, temp, r, g, b):
self.__on = on
self.__lum = lum
self.__temp = temp
self.__r = r
self.__g = g
self.__b = b
def on(self):
return self.__on
def set_onoff(self, on):
self.__on = on
super(Light, self).set_onoff(on)
if self.lum() == 0 and on != 0:
self.__lum = 1 # This seems to be the default
def lum(self):
return self.__lum
def set_luminance(self, lum, time):
self.__lum = lum
super(Light, self).set_luminance(lum, time)
if lum > 0 and self.__on == 0:
self.__on = 1
elif lum == 0 and self.__on != 0:
self.__on = 0
def temp(self):
return self.__temp
def set_temperature(self, temp, time):
self.__temp = temp
super(Light, self).set_temperature(temp, time)
def rgb(self):
return (self.red(), self.green(), self.blue())
def set_rgb(self, r, g, b, time):
self.__r = r
self.__g = g
self.__b = b
super(Light, self).set_rgb(r, g, b, time)
def red(self):
return self.__r
def green(self):
return self.__g
def blue(self):
return self.__b
def build_command(self, command, data):
return self.__conn.build_light_command(command, self, data)
class Group(Luminary):
def __init__(self, conn, logger, idx, name):
super(Group, self).__init__(conn, logger, name)
self.__conn = conn
self.__logger = logger
self.__idx = idx
self.__lights = []
def idx(self):
return self.__idx
def lights(self):
return self.__lights
def set_lights(self, lights):
self.__lights = lights
def __str__(self):
s = ""
for light_addr in self.lights():
if light_addr in self.__conn.lights():
light = self.__conn.lights()[light_addr]
else:
light = "%x" % light_addr
s = s + str(light) + " "
return "<group: %s, lights: %s>" % (self.name(), s)
def build_command(self, command, data):
return self.__conn.build_command(command, self, data)
class Lightify:
def __init__(self, host):
self.__logger = logging.getLogger(__name__)
self.__logger.addHandler(logging.NullHandler())
self.__logger.setLevel(logging.DEBUG)
self.__logger.info("Logging %s", __name__)
self.__seq = 1
self.__groups = {}
self.__lights = {}
self.host = host
self.connect() # Debugging purposes only
# self.__sock.connect((host, PORT))
def connect(self):
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.__sock.settimeout(5)
self.__sock.connect((self.host, PORT))
def groups(self):
"""Dict from group name to Group object."""
return self.__groups
def lights(self):
"""Dict from light addr to Light object."""
return self.__lights
def light_byname(self, name):
self.__logger.debug(len(self.lights()))
for light in self.lights().itervalues():
if light.name() == name:
return light
return None
def next_seq(self):
self.__seq = self.__seq + 1
return self.__seq
def build_global_command(self, command, data):
length = 6 + len(data)
try:
result = struct.pack(
"<H6B",
length,
0x02,
command,
0,
0,
0x7,
self.next_seq()
) + data
except TypeError:
# Decode using cp437 for python3. This is not UTF-8
result = struct.pack(
"<H6B",
length,
0x02,
command,
0,
0,
0x7,
self.next_seq()
) + data.decode('cp437')
return result
def build_basic_command(self, flag, command, group_or_light, data):
length = 14 + len(data)
try:
result = struct.pack(
"<H6B",
length,
flag,
command,
0,
0,
0x7,
self.next_seq()
) + group_or_light + data
except TypeError:
# Decode using cp437 for python3. This is not UTF-8
result = struct.pack(
"<H6B",
length,
flag,
command,
0,
0,
0x7,
self.next_seq()
) + group_or_light + data.decode('cp437')
return result
def build_command(self, command, group, data):
# length = 14 + len(data)
return self.build_basic_command(
0x02,
command,
struct.pack("<8B", group.idx(), 0, 0, 0, 0, 0, 0, 0),
data)
def build_light_command(self, command, light, data):
# length = 6 + 8 + len(data)
return self.build_basic_command(
0x00,
command,
struct.pack("<Q", light.addr()),
data
)
def build_onoff(self, item, on):
return item.build_command(COMMAND_ONOFF, struct.pack("<B", on))
def build_temp(self, item, temp, time):
return item.build_command(COMMAND_TEMP, struct.pack("<HH", temp, time))
def build_luminance(self, item, luminance, time):
return item.build_command(
COMMAND_LUMINANCE,
struct.pack("<BH", luminance, time)
)
def build_colour(self, item, red, green, blue, time):
return item.build_command(
COMMAND_COLOUR,
struct.pack("<BBBBH", red, green, blue, 0xff, time)
)
def build_group_info(self, group):
return self.build_command(COMMAND_GROUP_INFO, group, "")
def build_all_light_status(self, flag):
return self.build_global_command(
COMMAND_ALL_LIGHT_STATUS,
struct.pack("<B", flag)
)
def build_light_status(self, light):
return light.build_command(COMMAND_LIGHT_STATUS, "")
def build_group_list(self):
return self.build_global_command(COMMAND_GROUP_LIST, "")
def group_list(self):
groups = {}
data = self.build_group_list()
self.send(data)
data = self.recv()
(num,) = struct.unpack("<H", data[7:9])
self.__logger.debug('Num %d', num)
for i in range(0, num):
pos = 9+i*18
payload = data[pos:pos+18]
(idx, name) = struct.unpack("<H16s", payload)
name = name.replace('\0', "")
groups[idx] = name
self.__logger.debug("Idx %d: '%s'", idx, name)
return groups
def update_group_list(self):
lst = self.group_list()
groups = {}
for (idx, name) in lst.iteritems():
group = Group(self, self.__logger, idx, name)
group.set_lights(self.group_info(group))
groups[name] = group
self.__groups = groups
def group_info(self, group):
lights = []
data = self.build_group_info(group)
self.send(data)
data = self.recv()
payload = data[7:]
(idx, name, num) = struct.unpack("<H16sB", payload[:19])
name = name.replace('\0', "")
self.__logger.debug("Idx %d: '%s' %d", idx, name, num)
for i in range(0, num):
pos = 7 + 19 + i * 8
payload = data[pos:pos+8]
(addr,) = struct.unpack("<Q", payload[:8])
self.__logger.debug("%d: %x", i, addr)
lights.append(addr)
# self.read_light_status(addr)
return lights
def send(self, data):
self.__logger.debug('sending "%s"', binascii.hexlify(data))
if not self.__sock:
self.connect()
try:
res = self.__sock.sendall(data)
except (socket.timeout, BrokenPipeError, ConnectionResetError):
return self.connect()
return res
def recv(self):
lengthsize = 2
try:
data = self.__sock.recv(lengthsize)
except (socket.timeout, BrokenPipeError, ConnectionResetError):
return self.connect()
(length,) = struct.unpack("<H", data[:lengthsize])
self.__logger.debug(len(data))
string = ""
expected = length + 2 - len(data)
self.__logger.debug("Length %d", length)
self.__logger.debug("Expected %d", expected)
while expected > 0:
self.__logger.debug(
'received "%d %s"',
length,
binascii.hexlify(data)
)
try:
data = self.__sock.recv(expected)
except (socket.timeout, BrokenPipeError, ConnectionResetError):
self.connect()
expected = expected - len(data)
try:
string = string + data
except TypeError:
# Decode using cp437 for python3. This is not UTF-8
string = string + data.decode('cp437')
self.__logger.debug('received "%s"', string)
return data
def update_light_status(self, light):
data = self.build_light_status(light)
self.send(data)
data = self.recv()
return
(on, lum, temp, r, g, b, h) = struct.unpack("<27x2BH4B16x", data)
self.__logger.debug(
'status: %0x %0x %d %0x %0x %0x %0x', on, lum, temp, r, g, b, h)
self.__logger.debug('onoff: %d', on)
self.__logger.debug('temp: %d', temp)
self.__logger.debug('lum: %d', lum)
self.__logger.debug('red: %d', r)
self.__logger.debug('green: %d', g)
self.__logger.debug('blue: %d', b)
return (on, lum, temp, r, g, b)
def update_all_light_status(self):
data = self.build_all_light_status(1)
self.send(data)
data = self.recv()
if data:
(num,) = struct.unpack("<H", data[7:9])
self.__logger.debug('num: %d', num)
old_lights = self.__lights
new_lights = {}
status_len = 50
for i in range(0, num):
pos = 9 + i * status_len
payload = data[pos:pos+status_len]
self.__logger.debug("%d %d %d", i, pos, len(payload))
(a, addr, stat, name, extra) = struct.unpack("<HQ16s16sQ", payload)
try:
name = name.replace('\0', "")
except TypeError:
# Decode using cp437 for python3. This is not UTF-8
name = name.decode('cp437').replace('\0', "")
self.__logger.debug('light: %x %x %s %x', a, addr, name, extra)
if addr in old_lights:
light = old_lights[addr]
else:
light = Light(self, self.__logger, addr, name)
(b, on, lum, temp, r, g, b, h) = struct.unpack("<Q2BH4B", stat)
self.__logger.debug('status: %x %0x', b, h)
self.__logger.debug('onoff: %d', on)
self.__logger.debug('temp: %d', temp)
self.__logger.debug('lum: %d', lum)
self.__logger.debug('red: %d', r)
self.__logger.debug('green: %d', g)
self.__logger.debug('blue: %d', b)
light.update_status(on, lum, temp, r, g, b)
new_lights[addr] = light
# return (on, lum, temp, r, g, b)
self.__lights = new_lights
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment