Last active
January 16, 2024 20:12
-
-
Save wolph/ff13e3a78387dae56967c92f72960117 to your computer and use it in GitHub Desktop.
This script emulates the Dreamscreen sidekick and displays the output on your screen. It requires a shell that supports full-rgb ansi. More info can be found here: http://dreamscreen.boards.net/post/6271/thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import enum | |
import types | |
import aioudp | |
import struct | |
import asyncio | |
import blessings | |
from datetime import datetime | |
# Credits: Largely based on https://github.com/HugoPeters/DreamStream | |
DS_HOST = '0.0.0.0' | |
DS_PORT = 8888 | |
DS_MAGIC = 0xFC | |
PKT_WRITE_CONSTANT = 1 << 0 | |
PKT_WRITE = 1 << 1 | |
PKT_RESPONSE_REQUEST = 1 << 4 | |
PKT_BROADCAST = 1 << 5 | |
PKT_RESPONSE = 1 << 6 | |
CRC8 = [ | |
0x00, 0x07, 0x0E, 0x09, 0x1C, 0x1B, 0x12, 0x15, 0x38, 0x3F, 0x36, 0x31, | |
0x24, 0x23, 0x2A, 0x2D, 0x70, 0x77, 0x7E, 0x79, 0x6C, 0x6B, 0x62, 0x65, | |
0x48, 0x4F, 0x46, 0x41, 0x54, 0x53, 0x5A, 0x5D, 0xE0, 0xE7, 0xEE, 0xE9, | |
0xFC, 0xFB, 0xF2, 0xF5, 0xD8, 0xDF, 0xD6, 0xD1, 0xC4, 0xC3, 0xCA, 0xCD, | |
0x90, 0x97, 0x9E, 0x99, 0x8C, 0x8B, 0x82, 0x85, 0xA8, 0xAF, 0xA6, 0xA1, | |
0xB4, 0xB3, 0xBA, 0xBD, 0xC7, 0xC0, 0xC9, 0xCE, 0xDB, 0xDC, 0xD5, 0xD2, | |
0xFF, 0xF8, 0xF1, 0xF6, 0xE3, 0xE4, 0xED, 0xEA, 0xB7, 0xB0, 0xB9, 0xBE, | |
0xAB, 0xAC, 0xA5, 0xA2, 0x8F, 0x88, 0x81, 0x86, 0x93, 0x94, 0x9D, 0x9A, | |
0x27, 0x20, 0x29, 0x2E, 0x3B, 0x3C, 0x35, 0x32, 0x1F, 0x18, 0x11, 0x16, | |
0x03, 0x04, 0x0D, 0x0A, 0x57, 0x50, 0x59, 0x5E, 0x4B, 0x4C, 0x45, 0x42, | |
0x6F, 0x68, 0x61, 0x66, 0x73, 0x74, 0x7D, 0x7A, 0x89, 0x8E, 0x87, 0x80, | |
0x95, 0x92, 0x9B, 0x9C, 0xB1, 0xB6, 0xBF, 0xB8, 0xAD, 0xAA, 0xA3, 0xA4, | |
0xF9, 0xFE, 0xF7, 0xF0, 0xE5, 0xE2, 0xEB, 0xEC, 0xC1, 0xC6, 0xCF, 0xC8, | |
0xDD, 0xDA, 0xD3, 0xD4, 0x69, 0x6E, 0x67, 0x60, 0x75, 0x72, 0x7B, 0x7C, | |
0x51, 0x56, 0x5F, 0x58, 0x4D, 0x4A, 0x43, 0x44, 0x19, 0x1E, 0x17, 0x10, | |
0x05, 0x02, 0x0B, 0x0C, 0x21, 0x26, 0x2F, 0x28, 0x3D, 0x3A, 0x33, 0x34, | |
0x4E, 0x49, 0x40, 0x47, 0x52, 0x55, 0x5C, 0x5B, 0x76, 0x71, 0x78, 0x7F, | |
0x6A, 0x6D, 0x64, 0x63, 0x3E, 0x39, 0x30, 0x37, 0x22, 0x25, 0x2C, 0x2B, | |
0x06, 0x01, 0x08, 0x0F, 0x1A, 0x1D, 0x14, 0x13, 0xAE, 0xA9, 0xA0, 0xA7, | |
0xB2, 0xB5, 0xBC, 0xBB, 0x96, 0x91, 0x98, 0x9F, 0x8A, 0x8D, 0x84, 0x83, | |
0xDE, 0xD9, 0xD0, 0xD7, 0xC2, 0xC5, 0xCC, 0xCB, 0xE6, 0xE1, 0xE8, 0xEF, | |
0xFA, 0xFD, 0xF4, 0xF3 | |
] | |
class DEVICE_TYPE(enum.IntEnum): | |
DreamScreenHD = 0x1 | |
DreamScreen4K = 0x2 | |
SideKick = 0x3 | |
class DEVICE_MODE(enum.IntEnum): | |
Sleep = 0x0 | |
Video = 0x1 | |
Music = 0x2 | |
Ambient = 0x3 | |
class AMBIENT_MODE(enum.IntEnum): | |
RGB = 0x0 | |
Scene = 0x1 | |
class AMBIENT_SCENE(enum.IntEnum): | |
RandomColor = 0x0 | |
Fireside = 0x1 | |
Twinkle = 0x2 | |
Ocean = 0x3 | |
Rainbow = 0x4 | |
July4th = 0x5 | |
Holiday = 0x6 | |
Pop = 0x7 | |
EnchantedForest = 0x8 | |
class DeviceState: | |
m_device_type = DEVICE_TYPE.SideKick | |
m_brightness = 255 | |
m_ambient_mode_type = AMBIENT_MODE.RGB | |
m_ambient_show_type = AMBIENT_SCENE.RandomColor | |
m_fade_rate = 4 | |
m_group_number = 1 | |
m_mode = DEVICE_MODE.Ambient | |
m_esp_firmware = 0, 0 | |
m_ambient_color = [0xFF, 0, 0] | |
m_saturation = [0xFF, 0xFF, 0xFF] | |
m_sector_assignment = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] | |
m_name = "DreamStream" | |
m_group_name = "unassigned" | |
class PacketHeader: | |
m_magic = 0 | |
m_length = 0 | |
m_groupaddr = 0 | |
m_flags = 0 | |
m_cmd_upper = 0 | |
m_cmd_lower = 0 | |
# Source: | |
# https://github.com/HugoPeters/DreamStream/blob/master/src/esp32/esp32.ino | |
COMMANDS = { | |
(0xFF, 0xFF): 'INVALID', | |
(0x01, 0x05): 'ResetESP', | |
(0x01, 0x07): 'Name', | |
(0x01, 0x08): 'GroupName', | |
(0x01, 0x09): 'GroupNumber', | |
(0x01, 0x0A): 'CurrentState', | |
(0x01, 0x0B): 'Ping', | |
(0x01, 0x0C): 'SubscribeToSectorData', | |
(0x01, 0x15): 'ReadBootloaderMode', | |
(0x02, 0x02): 'ReadPICVersionNumber', | |
(0x02, 0x03): 'ReadDiagnostics', | |
(0x03, 0x01): 'Mode', | |
(0x03, 0x02): 'Brightness', | |
(0x03, 0x03): 'Zones', | |
(0x03, 0x04): 'ZonesBrightness', | |
(0x03, 0x05): 'AmbientColor', | |
(0x03, 0x06): 'Saturation', | |
(0x03, 0x08): 'AmbientModeType', | |
(0x03, 0x09): 'MusicModeType', | |
(0x03, 0x0A): 'MusicModeColors', | |
(0x03, 0x0B): 'MusicModeWeights', | |
(0x03, 0x0C): 'MinimumLuminosity', | |
(0x03, 0x0D): 'AmbientScene', | |
(0x03, 0x13): 'IndicatorLightAutoOff', | |
(0x03, 0x14): 'USBPowerEnable', | |
(0x03, 0x16): 'SectorData', | |
(0x03, 0x17): 'SectorAssignment', | |
(0x03, 0x18): 'SectorBroadcastControl', | |
(0x03, 0x19): 'SectorBroadcastTiming', | |
(0x03, 0x20): 'HDMIInput', | |
(0x03, 0x21): 'MusicModeSource', | |
(0x03, 0x23): 'HDMIInputName1', | |
(0x03, 0x24): 'HDMIInputName2', | |
(0x03, 0x25): 'HDMIInputName3', | |
(0x03, 0x26): 'CECPassthroughEnable', | |
(0x03, 0x27): 'CECSwitchingEnable', | |
(0x03, 0x28): 'HPDEnable', | |
(0x03, 0x2A): 'VideoFrameDelay', | |
(0x03, 0x2B): 'LetterboxingEnable', | |
(0x03, 0x2C): 'HDMIActiveChannels', | |
(0x03, 0x2D): 'ColorBoost', | |
(0x03, 0x2E): 'CECPowerEnable', | |
(0x03, 0x2F): 'PillarboxingEnable', | |
(0x03, 0x40): 'SKUSetup', | |
(0x03, 0x41): 'FlexSetup', | |
(0x03, 0x60): 'HDRToneRemapping', | |
(0x04, 0x01): 'BootloaderFlags', | |
(0x04, 0x02): 'ResetPIC', | |
(0x04, 0x0D): 'ESPConnectedToWiFi', | |
(0x04, 0x14): 'OtherConnectedToWiFi', | |
} | |
def calculate_crc(data): | |
data = bytearray(data) | |
crc = 0x00 | |
for i in range(len(data)): | |
crc = CRC8[(crc ^ data[i]) & 0xFF] | |
return crc | |
class DreamScreenPackage: | |
def __init__(self, data, addr): | |
if len(data) < 7: | |
raise TypeError(f'Package from {addr} too short: {data}') | |
elif data[0] != DS_MAGIC: | |
raise TypeError(f'Wrong magic byte from {addr}: {data}') | |
self.addr = addr | |
self.length, self.group, self.flags, self.upper_command, \ | |
self.lower_command = struct.unpack('xBBBBB', data[:6]) | |
self.payload = data[6:-1] | |
self.crc = data[-1] | |
def __repr__(self): | |
return ( | |
f'<{self.__class__.__name__}[{self.command}]: g: {self.group} ' | |
f'{self.payload}>') | |
@property | |
def payload(self): | |
return self._payload | |
@payload.setter | |
def payload(self, payload): | |
assert len(payload) == self.length - 5 | |
self._payload = payload | |
@property | |
def command(self): | |
code = self.upper_command, self.lower_command | |
return COMMANDS.get(code, 'Unknown command: %d:%d' % code) | |
@property | |
def crc(self): | |
return calculate_crc(self.payload) | |
@crc.setter | |
def crc(self, crc): | |
assert self.payload is not None, 'Payload needs to be set first' | |
if self.payload: | |
data = self.get_data() | |
expected_crc = calculate_crc(data + self.payload) | |
message = f'{crc} is not correct for {self.payload!r}' | |
message += f', got: {expected_crc}' | |
assert expected_crc == crc, message | |
def execute(self, protocol): | |
command = getattr(protocol, self.command, None) | |
if command: | |
asyncio.ensure_future(command(self), loop=protocol.loop) | |
else: | |
print('no command for', self) | |
async def send(self, remote, payload=b'', **kwargs): | |
if isinstance(payload, bytes): | |
pass | |
else: | |
if isinstance(payload, (list, tuple)): | |
payload = list(payload) | |
else: | |
payload = [payload] | |
payload = bytes(payload) | |
for k, v in kwargs.items(): | |
setattr(self, k, v) | |
data = self.get_data(payload) | |
message = struct.pack('BBBBBB', *data) | |
message += struct.pack('s', payload) | |
message += struct.pack('B', calculate_crc(data + payload)) | |
remote.send(message) | |
def get_data(self, payload=None): | |
payload = payload or self.payload | |
return bytearray(( | |
DS_MAGIC, | |
len(payload) + 5, | |
self.group, | |
self.flags, | |
self.upper_command, | |
self.lower_command, | |
)) | |
class DreamScreenProtocol(asyncio.DatagramProtocol): | |
def __init__(self, loop): | |
self.loop = loop | |
self.remotes = {} | |
super().__init__() | |
self.last_update = datetime.now() | |
print() | |
async def get_remote(self, address): | |
if address not in self.remotes: | |
self.remotes[address] = await aioudp.open_remote_endpoint(*address) | |
return self.remotes[address] | |
def connection_made(self, transport): | |
print('transport', transport) | |
self.transport = transport | |
def datagram_received(self, data, addr): | |
try: | |
package = DreamScreenPackage(data, addr) | |
except TypeError as e: | |
print(e) | |
return | |
package.execute(self) | |
def error_received(self, exc): | |
print('error', exc) | |
@classmethod | |
async def listen(cls, loop): | |
return await loop.create_datagram_endpoint( | |
lambda: cls(loop), local_addr=(DS_HOST, DS_PORT)) | |
async def SubscribeToSectorData(self, package): | |
await self.send(package, flags=PKT_RESPONSE_REQUEST, payload=0x01) | |
async def SectorData(self, package): | |
colors = [] | |
i = 0 | |
for r, g, b in struct.iter_unpack('BBB', package.payload): | |
colors.append(Color(i, r, g, b)) | |
i += 1 | |
def p(*args): | |
output = [] | |
for arg in args: | |
if isinstance(arg, (types.GeneratorType, list, tuple)): | |
output += list(map(str, arg)) | |
elif isinstance(arg, str): | |
output.append(arg) | |
else: | |
output.append(str(arg)) | |
output = ''.join(output) | |
output.rjust(term.width) | |
print(output) | |
print(term.move_up * 9) | |
p('colors: ', colors) | |
p(80 * ' ') | |
p(colors[i] for i in (7, 6, 5, 4, 3)) | |
p(f'{colors[8]} {colors[2]}') | |
p(colors[i] for i in (9, 10, 11, 0, 1)) | |
p(80 * ' ') | |
last_update = datetime.now() | |
duration = (last_update - self.last_update).total_seconds() | |
p('%.3f :: %s' % (1 / duration, duration)) | |
self.last_update = last_update | |
p(80 * ' ') | |
async def send(self, package, *args, **kwargs): | |
remote = await self.get_remote(package.addr) | |
await package.send(remote, *args, **kwargs) | |
class Color: | |
def __init__(self, index, red, green, blue): | |
self.index = index | |
self.red = red | |
self.green = green | |
self.blue = blue | |
def __str__(self): | |
color_cmd = f'\033[48;2;{self.red};{self.green};{self.blue}m' | |
reset_cmd = '\033[0m' | |
return f'{color_cmd}{self.index:^5}{reset_cmd}' | |
def main(loop): | |
print('\n' * 6) | |
transport, protocol = loop.run_until_complete( | |
DreamScreenProtocol.listen(loop)) | |
try: | |
loop.run_forever() | |
finally: | |
transport.close() | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
term = blessings.Terminal() | |
try: | |
main(loop) | |
finally: | |
loop.close() |
It looks like the original aioudp package has been replaced with a different one. The code can be changed to support the aioudp
package, but instead of being able to do await aioudp.open_remote_endpoint(*address)
we would need something like this:
async with aioudp.connect(*address) as connection:
...
That does mean the code needs to be restructured a bit since the current code doesn't (cleanly) support an async with statement.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi could you maybe help me set this up? I keep getting the error
future:`` <Task finished coro=<DreamScreenProtocol.SubscribeToSectorData() done, defined at dsemu.py:294> exception=AttributeError("module 'aioudp' has no attribute 'open_remote_endpoint'")> Traceback (most recent call last): File "dsemu.py", line 295, in SubscribeToSectorData await self.send(package, flags=PKT_RESPONSE_REQUEST, payload=0x01) File "dsemu.py", line 332, in send remote = await self.get_remote(package.addr) File "dsemu.py", line 269, in get_remote self.remotes[address] = await aioudp.open_remote_endpoint(*address) AttributeError: module 'aioudp' has no attribute 'open_remote_endpoint' Task exception was never retrieved
And theres no sidekick shown in the app. I tried downgrading aioudp to older releases without succes.