-
-
Save delfick/0554232f28278690b7c7efe4518e501b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from photons_app.errors import PhotonsAppError | |
from photons_app.actions import an_action | |
from photons_app import helpers as hp | |
from photons_messages import TileMessages, DeviceMessages, TileEffectType | |
from photons_control import test_helpers as chp | |
from photons_control.tile import SetTileEffect | |
from photons_products import Products | |
import itertools | |
import logging | |
log = logging.getLogger("is_it_a_tile") | |
@an_action(needs_target=True, special_reference=True) | |
async def is_it_a_tile(collector, target, reference, **kwargs): | |
async with target.session() as sender: | |
# Let's just guarantee the script is against one device | |
_, serials = await reference.find(sender, timeout=2) | |
if len(serials) != 1: | |
raise PhotonsAppError("Please target exactly one device", got=serials) | |
serial = serials[0] | |
await sender(DeviceMessages.SetPower(level=65535), serial) | |
# https://photons.delfick.com/interacting/gatherer_interface.html | |
# This code sends a GetVersion, GetHostFirmware, GetDeviceChain, Get64 and puts it all together | |
plans = sender.make_plans("capability", "parts") | |
got = await sender.gatherer.gather_all(plans, serial) | |
complete, info = got[serial] | |
if not complete: | |
raise PhotonsAppError("Couldn't get all information", got=got, serial=serial) | |
# Make sure it advertises itself as a tile | |
assert info["capability"]["cap"].product is Products.LCM3_TILE | |
assert all(part.width == 8 for part in info["parts"]), [ | |
(part, part.width) for part in info["parts"] | |
] | |
parts = info["parts"] | |
assert len(parts) > 0, parts | |
# Make sure we can change the user_x/user_y of the tile | |
new_positions = [] | |
change_positions = [] | |
for part in parts: | |
change_positions.append( | |
TileMessages.SetUserPosition( | |
tile_index=part.part_number, | |
user_x=part.user_x + 1, | |
user_y=part.user_y + 1, | |
res_required=False, | |
) | |
) | |
new_positions.append((part.part_number, part.user_x + 1, part.user_y + 1)) | |
await sender(change_positions, serial) | |
now_have = None | |
# Try asking for new positions until they have changed | |
async with hp.tick(0.3, max_time=2) as ticker: | |
async for _ in ticker: | |
plans = sender.make_plans("parts") | |
got = await sender.gatherer.gather_all(plans, serial) | |
assert got[serial][0], got | |
now_have = [ | |
(part.part_number, part.user_x, part.user_y) for part in got[serial][1]["parts"] | |
] | |
if now_have == new_positions: | |
break | |
assert now_have == new_positions, (now_have, new_positions) | |
for effect in ( | |
TileEffectType.MORPH, | |
TileEffectType.OFF, | |
TileEffectType.FLAME, | |
TileEffectType.OFF, | |
): | |
await sender(SetTileEffect(effect, power_on=False), serial) | |
got = None | |
async with hp.tick(0.3, max_time=2) as ticker: | |
async for _ in ticker: | |
plans = sender.make_plans("firmware_effects") | |
got = await sender.gatherer.gather_all(plans, serial) | |
if got[serial][0] and got[serial][1]["firmware_effects"]["type"] == effect: | |
break | |
# Note that a real tile takes a few seconds for the effect to display on hardware | |
assert got[serial][0], got | |
assert got[serial][1]["firmware_effects"]["type"] == effect, (got, effect) | |
for addition in (0, 1): | |
# Make sure we can change hsbk values | |
setters = [] | |
expected = [] | |
for i, part in enumerate(parts): | |
# The weird saturation value is because all LIFX products have this annoying thing they do | |
# Where if the saturation is 0, then it doesn't actually change the hue value..... | |
colors = [ | |
chp.Color(i + 50 + addition, 0.5 + addition / 2, 1, 3500) for _ in range(64) | |
] | |
expected.append(colors) | |
setters.append( | |
TileMessages.Set64( | |
tile_index=i, | |
x=0, | |
y=0, | |
length=1, | |
colors=colors, | |
width=8, | |
duration=0, | |
res_required=False, | |
) | |
) | |
await sender(setters, serial) | |
got = None | |
async with hp.tick(0.3, max_time=2) as ticker: | |
async for _ in ticker: | |
plans = sender.make_plans("colors") | |
got = await sender.gatherer.gather_all(plans, serial) | |
if got[serial][0] and got[serial][1]["colors"] == expected: | |
break | |
assert got[serial][0], got | |
for i, (p, e) in enumerate(itertools.zip_longest(got[serial][1]["colors"], expected)): | |
assert p is not None and e is not None, (got, expected) | |
for j, (pc, ec) in enumerate(itertools.zip_longest(p, e)): | |
if pc != ec: | |
print(f"Part {i}, Color {j}, Got {pc}, Want {ec}") | |
assert got[serial][1]["colors"] == expected | |
if __name__ == "__main__": | |
__import__("photons_core").run("lan:is_it_a_tile {@:1:}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment