Last active
January 16, 2024 20:12
-
-
Save wolph/ff13e3a78387dae56967c92f72960117 to your computer and use it in GitHub Desktop.
This script emulates the Dreamscreen sidekick and displays the output on your screen. It requires a shell that supports full-rgb ansi. More info can be found here: http://dreamscreen.boards.net/post/6271/thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import enum | |
import types | |
import aioudp | |
import struct | |
import asyncio | |
import blessings | |
from datetime import datetime | |
# Credits: Largely based on https://github.com/HugoPeters/DreamStream | |
DS_HOST = '0.0.0.0' | |
DS_PORT = 8888 | |
DS_MAGIC = 0xFC | |
PKT_WRITE_CONSTANT = 1 << 0 | |
PKT_WRITE = 1 << 1 | |
PKT_RESPONSE_REQUEST = 1 << 4 | |
PKT_BROADCAST = 1 << 5 | |
PKT_RESPONSE = 1 << 6 | |
CRC8 = [ | |
0x00, 0x07, 0x0E, 0x09, 0x1C, 0x1B, 0x12, 0x15, 0x38, 0x3F, 0x36, 0x31, | |
0x24, 0x23, 0x2A, 0x2D, 0x70, 0x77, 0x7E, 0x79, 0x6C, 0x6B, 0x62, 0x65, | |
0x48, 0x4F, 0x46, 0x41, 0x54, 0x53, 0x5A, 0x5D, 0xE0, 0xE7, 0xEE, 0xE9, | |
0xFC, 0xFB, 0xF2, 0xF5, 0xD8, 0xDF, 0xD6, 0xD1, 0xC4, 0xC3, 0xCA, 0xCD, | |
0x90, 0x97, 0x9E, 0x99, 0x8C, 0x8B, 0x82, 0x85, 0xA8, 0xAF, 0xA6, 0xA1, | |
0xB4, 0xB3, 0xBA, 0xBD, 0xC7, 0xC0, 0xC9, 0xCE, 0xDB, 0xDC, 0xD5, 0xD2, | |
0xFF, 0xF8, 0xF1, 0xF6, 0xE3, 0xE4, 0xED, 0xEA, 0xB7, 0xB0, 0xB9, 0xBE, | |
0xAB, 0xAC, 0xA5, 0xA2, 0x8F, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9D, 0x9A, | |
0x27, 0x20, 0x29, 0x2E, 0x3B, 0x3C, 0x35, 0x32, 0x1F, 0x18, 0x11, 0x16, | |
0x03, 0x04, 0x0D, 0x0A, 0x57, 0x50, 0x59, 0x5E, 0x4B, 0x4C, 0x45, 0x42, | |
0x6F, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7D, 0x7A, 0x89, 0x8E, 0x87, 0x80, | |
0x95, 0x92, 0x9B, 0x9C, 0xB1, 0xB6, 0xBF, 0xB8, 0xAD, 0xAA, 0xA3, 0xA4, | |
0xF9, 0xFE, 0xF7, 0xF0, 0xE5, 0xE2, 0xEB, 0xEC, 0xC1, 0xC6, 0xCF, 0xC8, | |
0xDD, 0xDA, 0xD3, 0xD4, 0x69, 0x6E, 0x67, 0x60, 0x75, 0x72, 0x7B, 0x7C, | |
0x51, 0x56, 0x5F, 0x58, 0x4D, 0x4A, 0x43, 0x44, 0x19, 0x1E, 0x17, 0x10, | |
0x05, 0x02, 0x0B, 0x0C, 0x21, 0x26, 0x2F, 0x28, 0x3D, 0x3A, 0x33, 0x34, | |
0x4E, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5C, 0x5B, 0x76, 0x71, 0x78, 0x7F, | |
0x6A, 0x6D, 0x64, 0x63, 0x3E, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2C, 0x2B, | |
0x06, 0x01, 0x08, 0x0F, 0x1A, 0x1D, 0x14, 0x13, 0xAE, 0xA9, 0xA0, 0xA7, | |
0xB2, 0xB5, 0xBC, 0xBB, 0x96, 0x91, 0x98, 0x9F, 0x8A, 0x8D, 0x84, 0x83, | |
0xDE, 0xD9, 0xD0, 0xD7, 0xC2, 0xC5, 0xCC, 0xCB, 0xE6, 0xE1, 0xE8, 0xEF, | |
0xFA, 0xFD, 0xF4, 0xF3 | |
] | |
class DEVICE_TYPE(enum.IntEnum): | |
DreamScreenHD = 0x1 | |
DreamScreen4K = 0x2 | |
SideKick = 0x3 | |
class DEVICE_MODE(enum.IntEnum): | |
Sleep = 0x0 | |
Video = 0x1 | |
Music = 0x2 | |
Ambient = 0x3 | |
class AMBIENT_MODE(enum.IntEnum): | |
RGB = 0x0 | |
Scene = 0x1 | |
class AMBIENT_SCENE(enum.IntEnum): | |
RandomColor = 0x0 | |
Fireside = 0x1 | |
Twinkle = 0x2 | |
Ocean = 0x3 | |
Rainbow = 0x4 | |
July4th = 0x5 | |
Holiday = 0x6 | |
Pop = 0x7 | |
EnchantedForest = 0x8 | |
class DeviceState: | |
m_device_type = DEVICE_TYPE.SideKick | |
m_brightness = 255 | |
m_ambient_mode_type = AMBIENT_MODE.RGB | |
m_ambient_show_type = AMBIENT_SCENE.RandomColor | |
m_fade_rate = 4 | |
m_group_number = 1 | |
m_mode = DEVICE_MODE.Ambient | |
m_esp_firmware = 0, 0 | |
m_ambient_color = [0xFF, 0, 0] | |
m_saturation = [0xFF, 0xFF, 0xFF] | |
m_sector_assignment = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] | |
m_name = "DreamStream" | |
m_group_name = "unassigned" | |
class PacketHeader: | |
m_magic = 0 | |
m_length = 0 | |
m_groupaddr = 0 | |
m_flags = 0 | |
m_cmd_upper = 0 | |
m_cmd_lower = 0 | |
# Source: | |
# https://github.com/HugoPeters/DreamStream/blob/master/src/esp32/esp32.ino | |
COMMANDS = { | |
(0xFF, 0xFF): 'INVALID', | |
(0x01, 0x05): 'ResetESP', | |
(0x01, 0x07): 'Name', | |
(0x01, 0x08): 'GroupName', | |
(0x01, 0x09): 'GroupNumber', | |
(0x01, 0x0A): 'CurrentState', | |
(0x01, 0x0B): 'Ping', | |
(0x01, 0x0C): 'SubscribeToSectorData', | |
(0x01, 0x15): 'ReadBootloaderMode', | |
(0x02, 0x02): 'ReadPICVersionNumber', | |
(0x02, 0x03): 'ReadDiagnostics', | |
(0x03, 0x01): 'Mode', | |
(0x03, 0x02): 'Brightness', | |
(0x03, 0x03): 'Zones', | |
(0x03, 0x04): 'ZonesBrightness', | |
(0x03, 0x05): 'AmbientColor', | |
(0x03, 0x06): 'Saturation', | |
(0x03, 0x08): 'AmbientModeType', | |
(0x03, 0x09): 'MusicModeType', | |
(0x03, 0x0A): 'MusicModeColors', | |
(0x03, 0x0B): 'MusicModeWeights', | |
(0x03, 0x0C): 'MinimumLuminosity', | |
(0x03, 0x0D): 'AmbientScene', | |
(0x03, 0x13): 'IndicatorLightAutoOff', | |
(0x03, 0x14): 'USBPowerEnable', | |
(0x03, 0x16): 'SectorData', | |
(0x03, 0x17): 'SectorAssignment', | |
(0x03, 0x18): 'SectorBroadcastControl', | |
(0x03, 0x19): 'SectorBroadcastTiming', | |
(0x03, 0x20): 'HDMIInput', | |
(0x03, 0x21): 'MusicModeSource', | |
(0x03, 0x23): 'HDMIInputName1', | |
(0x03, 0x24): 'HDMIInputName2', | |
(0x03, 0x25): 'HDMIInputName3', | |
(0x03, 0x26): 'CECPassthroughEnable', | |
(0x03, 0x27): 'CECSwitchingEnable', | |
(0x03, 0x28): 'HPDEnable', | |
(0x03, 0x2A): 'VideoFrameDelay', | |
(0x03, 0x2B): 'LetterboxingEnable', | |
(0x03, 0x2C): 'HDMIActiveChannels', | |
(0x03, 0x2D): 'ColorBoost', | |
(0x03, 0x2E): 'CECPowerEnable', | |
(0x03, 0x2F): 'PillarboxingEnable', | |
(0x03, 0x40): 'SKUSetup', | |
(0x03, 0x41): 'FlexSetup', | |
(0x03, 0x60): 'HDRToneRemapping', | |
(0x04, 0x01): 'BootloaderFlags', | |
(0x04, 0x02): 'ResetPIC', | |
(0x04, 0x0D): 'ESPConnectedToWiFi', | |
(0x04, 0x14): 'OtherConnectedToWiFi', | |
} | |
def calculate_crc(data): | |
data = bytearray(data) | |
crc = 0x00 | |
for i in range(len(data)): | |
crc = CRC8[(crc ^ data[i]) & 0xFF] | |
return crc | |
class DreamScreenPackage: | |
def __init__(self, data, addr): | |
if len(data) < 7: | |
raise TypeError(f'Package from {addr} too short: {data}') | |
elif data[0] != DS_MAGIC: | |
raise TypeError(f'Wrong magic byte from {addr}: {data}') | |
self.addr = addr | |
self.length, self.group, self.flags, self.upper_command, \ | |
self.lower_command = struct.unpack('xBBBBB', data[:6]) | |
self.payload = data[6:-1] | |
self.crc = data[-1] | |
def __repr__(self): | |
return ( | |
f'<{self.__class__.__name__}[{self.command}]: g: {self.group} ' | |
f'{self.payload}>') | |
@property | |
def payload(self): | |
return self._payload | |
@payload.setter | |
def payload(self, payload): | |
assert len(payload) == self.length - 5 | |
self._payload = payload | |
@property | |
def command(self): | |
code = self.upper_command, self.lower_command | |
return COMMANDS.get(code, 'Unknown command: %d:%d' % code) | |
@property | |
def crc(self): | |
return calculate_crc(self.payload) | |
@crc.setter | |
def crc(self, crc): | |
assert self.payload is not None, 'Payload needs to be set first' | |
if self.payload: | |
data = self.get_data() | |
expected_crc = calculate_crc(data + self.payload) | |
message = f'{crc} is not correct for {self.payload!r}' | |
message += f', got: {expected_crc}' | |
assert expected_crc == crc, message | |
def execute(self, protocol): | |
command = getattr(protocol, self.command, None) | |
if command: | |
asyncio.ensure_future(command(self), loop=protocol.loop) | |
else: | |
print('no command for', self) | |
async def send(self, remote, payload=b'', **kwargs): | |
if isinstance(payload, bytes): | |
pass | |
else: | |
if isinstance(payload, (list, tuple)): | |
payload = list(payload) | |
else: | |
payload = [payload] | |
payload = bytes(payload) | |
for k, v in kwargs.items(): | |
setattr(self, k, v) | |
data = self.get_data(payload) | |
message = struct.pack('BBBBBB', *data) | |
message += struct.pack('s', payload) | |
message += struct.pack('B', calculate_crc(data + payload)) | |
remote.send(message) | |
def get_data(self, payload=None): | |
payload = payload or self.payload | |
return bytearray(( | |
DS_MAGIC, | |
len(payload) + 5, | |
self.group, | |
self.flags, | |
self.upper_command, | |
self.lower_command, | |
)) | |
class DreamScreenProtocol(asyncio.DatagramProtocol): | |
def __init__(self, loop): | |
self.loop = loop | |
self.remotes = {} | |
super().__init__() | |
self.last_update = datetime.now() | |
print() | |
async def get_remote(self, address): | |
if address not in self.remotes: | |
self.remotes[address] = await aioudp.open_remote_endpoint(*address) | |
return self.remotes[address] | |
def connection_made(self, transport): | |
print('transport', transport) | |
self.transport = transport | |
def datagram_received(self, data, addr): | |
try: | |
package = DreamScreenPackage(data, addr) | |
except TypeError as e: | |
print(e) | |
return | |
package.execute(self) | |
def error_received(self, exc): | |
print('error', exc) | |
@classmethod | |
async def listen(cls, loop): | |
return await loop.create_datagram_endpoint( | |
lambda: cls(loop), local_addr=(DS_HOST, DS_PORT)) | |
async def SubscribeToSectorData(self, package): | |
await self.send(package, flags=PKT_RESPONSE_REQUEST, payload=0x01) | |
async def SectorData(self, package): | |
colors = [] | |
i = 0 | |
for r, g, b in struct.iter_unpack('BBB', package.payload): | |
colors.append(Color(i, r, g, b)) | |
i += 1 | |
def p(*args): | |
output = [] | |
for arg in args: | |
if isinstance(arg, (types.GeneratorType, list, tuple)): | |
output += list(map(str, arg)) | |
elif isinstance(arg, str): | |
output.append(arg) | |
else: | |
output.append(str(arg)) | |
output = ''.join(output) | |
output.rjust(term.width) | |
print(output) | |
print(term.move_up * 9) | |
p('colors: ', colors) | |
p(80 * ' ') | |
p(colors[i] for i in (7, 6, 5, 4, 3)) | |
p(f'{colors[8]} {colors[2]}') | |
p(colors[i] for i in (9, 10, 11, 0, 1)) | |
p(80 * ' ') | |
last_update = datetime.now() | |
duration = (last_update - self.last_update).total_seconds() | |
p('%.3f :: %s' % (1 / duration, duration)) | |
self.last_update = last_update | |
p(80 * ' ') | |
async def send(self, package, *args, **kwargs): | |
remote = await self.get_remote(package.addr) | |
await package.send(remote, *args, **kwargs) | |
class Color: | |
def __init__(self, index, red, green, blue): | |
self.index = index | |
self.red = red | |
self.green = green | |
self.blue = blue | |
def __str__(self): | |
color_cmd = f'\033[48;2;{self.red};{self.green};{self.blue}m' | |
reset_cmd = '\033[0m' | |
return f'{color_cmd}{self.index:^5}{reset_cmd}' | |
def main(loop): | |
print('\n' * 6) | |
transport, protocol = loop.run_until_complete( | |
DreamScreenProtocol.listen(loop)) | |
try: | |
loop.run_forever() | |
finally: | |
transport.close() | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
term = blessings.Terminal() | |
try: | |
main(loop) | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It looks like the original aioudp package has been replaced with a different one. The code can be changed to support the
aioudp
package, but instead of being able to doawait aioudp.open_remote_endpoint(*address)
we would need something like this:That does mean the code needs to be restructured a bit since the current code doesn't (cleanly) support an async with statement.