Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
This script emulates the Dreamscreen sidekick and displays the output on your screen. It requires a shell that supports full-rgb ansi. More info can be found here: http://dreamscreen.boards.net/post/6271/thread
#!/usr/bin/env python
import enum
import types
import aioudp
import struct
import asyncio
import blessings
from datetime import datetime
# Credits: Largely based on https://github.com/HugoPeters/DreamStream
DS_HOST = '0.0.0.0'
DS_PORT = 8888
DS_MAGIC = 0xFC
PKT_WRITE_CONSTANT = 1 << 0
PKT_WRITE = 1 << 1
PKT_RESPONSE_REQUEST = 1 << 4
PKT_BROADCAST = 1 << 5
PKT_RESPONSE = 1 << 6
CRC8 = [
0x00, 0x07, 0x0E, 0x09, 0x1C, 0x1B, 0x12, 0x15, 0x38, 0x3F, 0x36, 0x31,
0x24, 0x23, 0x2A, 0x2D, 0x70, 0x77, 0x7E, 0x79, 0x6C, 0x6B, 0x62, 0x65,
0x48, 0x4F, 0x46, 0x41, 0x54, 0x53, 0x5A, 0x5D, 0xE0, 0xE7, 0xEE, 0xE9,
0xFC, 0xFB, 0xF2, 0xF5, 0xD8, 0xDF, 0xD6, 0xD1, 0xC4, 0xC3, 0xCA, 0xCD,
0x90, 0x97, 0x9E, 0x99, 0x8C, 0x8B, 0x82, 0x85, 0xA8, 0xAF, 0xA6, 0xA1,
0xB4, 0xB3, 0xBA, 0xBD, 0xC7, 0xC0, 0xC9, 0xCE, 0xDB, 0xDC, 0xD5, 0xD2,
0xFF, 0xF8, 0xF1, 0xF6, 0xE3, 0xE4, 0xED, 0xEA, 0xB7, 0xB0, 0xB9, 0xBE,
0xAB, 0xAC, 0xA5, 0xA2, 0x8F, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9D, 0x9A,
0x27, 0x20, 0x29, 0x2E, 0x3B, 0x3C, 0x35, 0x32, 0x1F, 0x18, 0x11, 0x16,
0x03, 0x04, 0x0D, 0x0A, 0x57, 0x50, 0x59, 0x5E, 0x4B, 0x4C, 0x45, 0x42,
0x6F, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7D, 0x7A, 0x89, 0x8E, 0x87, 0x80,
0x95, 0x92, 0x9B, 0x9C, 0xB1, 0xB6, 0xBF, 0xB8, 0xAD, 0xAA, 0xA3, 0xA4,
0xF9, 0xFE, 0xF7, 0xF0, 0xE5, 0xE2, 0xEB, 0xEC, 0xC1, 0xC6, 0xCF, 0xC8,
0xDD, 0xDA, 0xD3, 0xD4, 0x69, 0x6E, 0x67, 0x60, 0x75, 0x72, 0x7B, 0x7C,
0x51, 0x56, 0x5F, 0x58, 0x4D, 0x4A, 0x43, 0x44, 0x19, 0x1E, 0x17, 0x10,
0x05, 0x02, 0x0B, 0x0C, 0x21, 0x26, 0x2F, 0x28, 0x3D, 0x3A, 0x33, 0x34,
0x4E, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5C, 0x5B, 0x76, 0x71, 0x78, 0x7F,
0x6A, 0x6D, 0x64, 0x63, 0x3E, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2C, 0x2B,
0x06, 0x01, 0x08, 0x0F, 0x1A, 0x1D, 0x14, 0x13, 0xAE, 0xA9, 0xA0, 0xA7,
0xB2, 0xB5, 0xBC, 0xBB, 0x96, 0x91, 0x98, 0x9F, 0x8A, 0x8D, 0x84, 0x83,
0xDE, 0xD9, 0xD0, 0xD7, 0xC2, 0xC5, 0xCC, 0xCB, 0xE6, 0xE1, 0xE8, 0xEF,
0xFA, 0xFD, 0xF4, 0xF3
]
class DEVICE_TYPE(enum.IntEnum):
DreamScreenHD = 0x1
DreamScreen4K = 0x2
SideKick = 0x3
class DEVICE_MODE(enum.IntEnum):
Sleep = 0x0
Video = 0x1
Music = 0x2
Ambient = 0x3
class AMBIENT_MODE(enum.IntEnum):
RGB = 0x0
Scene = 0x1
class AMBIENT_SCENE(enum.IntEnum):
RandomColor = 0x0
Fireside = 0x1
Twinkle = 0x2
Ocean = 0x3
Rainbow = 0x4
July4th = 0x5
Holiday = 0x6
Pop = 0x7
EnchantedForest = 0x8
class DeviceState:
m_device_type = DEVICE_TYPE.SideKick
m_brightness = 255
m_ambient_mode_type = AMBIENT_MODE.RGB
m_ambient_show_type = AMBIENT_SCENE.RandomColor
m_fade_rate = 4
m_group_number = 1
m_mode = DEVICE_MODE.Ambient
m_esp_firmware = 0, 0
m_ambient_color = [0xFF, 0, 0]
m_saturation = [0xFF, 0xFF, 0xFF]
m_sector_assignment = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
m_name = "DreamStream"
m_group_name = "unassigned"
class PacketHeader:
m_magic = 0
m_length = 0
m_groupaddr = 0
m_flags = 0
m_cmd_upper = 0
m_cmd_lower = 0
# Source:
# https://github.com/HugoPeters/DreamStream/blob/master/src/esp32/esp32.ino
COMMANDS = {
(0xFF, 0xFF): 'INVALID',
(0x01, 0x05): 'ResetESP',
(0x01, 0x07): 'Name',
(0x01, 0x08): 'GroupName',
(0x01, 0x09): 'GroupNumber',
(0x01, 0x0A): 'CurrentState',
(0x01, 0x0B): 'Ping',
(0x01, 0x0C): 'SubscribeToSectorData',
(0x01, 0x15): 'ReadBootloaderMode',
(0x02, 0x02): 'ReadPICVersionNumber',
(0x02, 0x03): 'ReadDiagnostics',
(0x03, 0x01): 'Mode',
(0x03, 0x02): 'Brightness',
(0x03, 0x03): 'Zones',
(0x03, 0x04): 'ZonesBrightness',
(0x03, 0x05): 'AmbientColor',
(0x03, 0x06): 'Saturation',
(0x03, 0x08): 'AmbientModeType',
(0x03, 0x09): 'MusicModeType',
(0x03, 0x0A): 'MusicModeColors',
(0x03, 0x0B): 'MusicModeWeights',
(0x03, 0x0C): 'MinimumLuminosity',
(0x03, 0x0D): 'AmbientScene',
(0x03, 0x13): 'IndicatorLightAutoOff',
(0x03, 0x14): 'USBPowerEnable',
(0x03, 0x16): 'SectorData',
(0x03, 0x17): 'SectorAssignment',
(0x03, 0x18): 'SectorBroadcastControl',
(0x03, 0x19): 'SectorBroadcastTiming',
(0x03, 0x20): 'HDMIInput',
(0x03, 0x21): 'MusicModeSource',
(0x03, 0x23): 'HDMIInputName1',
(0x03, 0x24): 'HDMIInputName2',
(0x03, 0x25): 'HDMIInputName3',
(0x03, 0x26): 'CECPassthroughEnable',
(0x03, 0x27): 'CECSwitchingEnable',
(0x03, 0x28): 'HPDEnable',
(0x03, 0x2A): 'VideoFrameDelay',
(0x03, 0x2B): 'LetterboxingEnable',
(0x03, 0x2C): 'HDMIActiveChannels',
(0x03, 0x2D): 'ColorBoost',
(0x03, 0x2E): 'CECPowerEnable',
(0x03, 0x2F): 'PillarboxingEnable',
(0x03, 0x40): 'SKUSetup',
(0x03, 0x41): 'FlexSetup',
(0x03, 0x60): 'HDRToneRemapping',
(0x04, 0x01): 'BootloaderFlags',
(0x04, 0x02): 'ResetPIC',
(0x04, 0x0D): 'ESPConnectedToWiFi',
(0x04, 0x14): 'OtherConnectedToWiFi',
}
def calculate_crc(data):
data = bytearray(data)
crc = 0x00
for i in range(len(data)):
crc = CRC8[(crc ^ data[i]) & 0xFF]
return crc
class DreamScreenPackage:
def __init__(self, data, addr):
if len(data) < 7:
raise TypeError(f'Package from {addr} too short: {data}')
elif data[0] != DS_MAGIC:
raise TypeError(f'Wrong magic byte from {addr}: {data}')
self.addr = addr
self.length, self.group, self.flags, self.upper_command, \
self.lower_command = struct.unpack('xBBBBB', data[:6])
self.payload = data[6:-1]
self.crc = data[-1]
def __repr__(self):
return (
f'<{self.__class__.__name__}[{self.command}]: g: {self.group} '
f'{self.payload}>')
@property
def payload(self):
return self._payload
@payload.setter
def payload(self, payload):
assert len(payload) == self.length - 5
self._payload = payload
@property
def command(self):
code = self.upper_command, self.lower_command
return COMMANDS.get(code, 'Unknown command: %d:%d' % code)
@property
def crc(self):
return calculate_crc(self.payload)
@crc.setter
def crc(self, crc):
assert self.payload is not None, 'Payload needs to be set first'
if self.payload:
data = self.get_data()
expected_crc = calculate_crc(data + self.payload)
message = f'{crc} is not correct for {self.payload!r}'
message += f', got: {expected_crc}'
assert expected_crc == crc, message
def execute(self, protocol):
command = getattr(protocol, self.command, None)
if command:
asyncio.ensure_future(command(self), loop=protocol.loop)
else:
print('no command for', self)
async def send(self, remote, payload=b'', **kwargs):
if isinstance(payload, bytes):
pass
else:
if isinstance(payload, (list, tuple)):
payload = list(payload)
else:
payload = [payload]
payload = bytes(payload)
for k, v in kwargs.items():
setattr(self, k, v)
data = self.get_data(payload)
message = struct.pack('BBBBBB', *data)
message += struct.pack('s', payload)
message += struct.pack('B', calculate_crc(data + payload))
remote.send(message)
def get_data(self, payload=None):
payload = payload or self.payload
return bytearray((
DS_MAGIC,
len(payload) + 5,
self.group,
self.flags,
self.upper_command,
self.lower_command,
))
class DreamScreenProtocol(asyncio.DatagramProtocol):
def __init__(self, loop):
self.loop = loop
self.remotes = {}
super().__init__()
self.last_update = datetime.now()
print()
async def get_remote(self, address):
if address not in self.remotes:
self.remotes[address] = await aioudp.open_remote_endpoint(*address)
return self.remotes[address]
def connection_made(self, transport):
print('transport', transport)
self.transport = transport
def datagram_received(self, data, addr):
try:
package = DreamScreenPackage(data, addr)
except TypeError as e:
print(e)
return
package.execute(self)
def error_received(self, exc):
print('error', exc)
@classmethod
async def listen(cls, loop):
return await loop.create_datagram_endpoint(
lambda: cls(loop), local_addr=(DS_HOST, DS_PORT))
async def SubscribeToSectorData(self, package):
await self.send(package, flags=PKT_RESPONSE_REQUEST, payload=0x01)
async def SectorData(self, package):
colors = []
i = 0
for r, g, b in struct.iter_unpack('BBB', package.payload):
colors.append(Color(i, r, g, b))
i += 1
def p(*args):
output = []
for arg in args:
if isinstance(arg, (types.GeneratorType, list, tuple)):
output += list(map(str, arg))
elif isinstance(arg, str):
output.append(arg)
else:
output.append(str(arg))
output = ''.join(output)
output.rjust(term.width)
print(output)
print(term.move_up * 9)
p('colors: ', colors)
p(80 * ' ')
p(colors[i] for i in (7, 6, 5, 4, 3))
p(f'{colors[8]} {colors[2]}')
p(colors[i] for i in (9, 10, 11, 0, 1))
p(80 * ' ')
last_update = datetime.now()
duration = (last_update - self.last_update).total_seconds()
p('%.3f :: %s' % (1 / duration, duration))
self.last_update = last_update
p(80 * ' ')
async def send(self, package, *args, **kwargs):
remote = await self.get_remote(package.addr)
await package.send(remote, *args, **kwargs)
class Color:
def __init__(self, index, red, green, blue):
self.index = index
self.red = red
self.green = green
self.blue = blue
def __str__(self):
color_cmd = f'\033[48;2;{self.red};{self.green};{self.blue}m'
reset_cmd = '\033[0m'
return f'{color_cmd}{self.index:^5}{reset_cmd}'
def main(loop):
print('\n' * 6)
transport, protocol = loop.run_until_complete(
DreamScreenProtocol.listen(loop))
try:
loop.run_forever()
finally:
transport.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
term = blessings.Terminal()
try:
main(loop)
finally:
loop.close()
@WoLpH

This comment has been minimized.

Copy link
Owner Author

commented Mar 3, 2019

Example screenshot

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.