Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@delfick
Last active March 17, 2021 19:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save delfick/0554232f28278690b7c7efe4518e501b to your computer and use it in GitHub Desktop.
Save delfick/0554232f28278690b7c7efe4518e501b to your computer and use it in GitHub Desktop.
from photons_app.errors import PhotonsAppError
from photons_app.actions import an_action
from photons_app import helpers as hp
from photons_messages import TileMessages, DeviceMessages, TileEffectType
from photons_control import test_helpers as chp
from photons_control.tile import SetTileEffect
from photons_products import Products
import itertools
import logging
log = logging.getLogger("is_it_a_tile")
@an_action(needs_target=True, special_reference=True)
async def is_it_a_tile(collector, target, reference, **kwargs):
async with target.session() as sender:
# Let's just guarantee the script is against one device
_, serials = await reference.find(sender, timeout=2)
if len(serials) != 1:
raise PhotonsAppError("Please target exactly one device", got=serials)
serial = serials[0]
await sender(DeviceMessages.SetPower(level=65535), serial)
# https://photons.delfick.com/interacting/gatherer_interface.html
# This code sends a GetVersion, GetHostFirmware, GetDeviceChain, Get64 and puts it all together
plans = sender.make_plans("capability", "parts")
got = await sender.gatherer.gather_all(plans, serial)
complete, info = got[serial]
if not complete:
raise PhotonsAppError("Couldn't get all information", got=got, serial=serial)
# Make sure it advertises itself as a tile
assert info["capability"]["cap"].product is Products.LCM3_TILE
assert all(part.width == 8 for part in info["parts"]), [
(part, part.width) for part in info["parts"]
]
parts = info["parts"]
assert len(parts) > 0, parts
# Make sure we can change the user_x/user_y of the tile
new_positions = []
change_positions = []
for part in parts:
change_positions.append(
TileMessages.SetUserPosition(
tile_index=part.part_number,
user_x=part.user_x + 1,
user_y=part.user_y + 1,
res_required=False,
)
)
new_positions.append((part.part_number, part.user_x + 1, part.user_y + 1))
await sender(change_positions, serial)
now_have = None
# Try asking for new positions until they have changed
async with hp.tick(0.3, max_time=2) as ticker:
async for _ in ticker:
plans = sender.make_plans("parts")
got = await sender.gatherer.gather_all(plans, serial)
assert got[serial][0], got
now_have = [
(part.part_number, part.user_x, part.user_y) for part in got[serial][1]["parts"]
]
if now_have == new_positions:
break
assert now_have == new_positions, (now_have, new_positions)
for effect in (
TileEffectType.MORPH,
TileEffectType.OFF,
TileEffectType.FLAME,
TileEffectType.OFF,
):
await sender(SetTileEffect(effect, power_on=False), serial)
got = None
async with hp.tick(0.3, max_time=2) as ticker:
async for _ in ticker:
plans = sender.make_plans("firmware_effects")
got = await sender.gatherer.gather_all(plans, serial)
if got[serial][0] and got[serial][1]["firmware_effects"]["type"] == effect:
break
# Note that a real tile takes a few seconds for the effect to display on hardware
assert got[serial][0], got
assert got[serial][1]["firmware_effects"]["type"] == effect, (got, effect)
for addition in (0, 1):
# Make sure we can change hsbk values
setters = []
expected = []
for i, part in enumerate(parts):
# The weird saturation value is because all LIFX products have this annoying thing they do
# Where if the saturation is 0, then it doesn't actually change the hue value.....
colors = [
chp.Color(i + 50 + addition, 0.5 + addition / 2, 1, 3500) for _ in range(64)
]
expected.append(colors)
setters.append(
TileMessages.Set64(
tile_index=i,
x=0,
y=0,
length=1,
colors=colors,
width=8,
duration=0,
res_required=False,
)
)
await sender(setters, serial)
got = None
async with hp.tick(0.3, max_time=2) as ticker:
async for _ in ticker:
plans = sender.make_plans("colors")
got = await sender.gatherer.gather_all(plans, serial)
if got[serial][0] and got[serial][1]["colors"] == expected:
break
assert got[serial][0], got
for i, (p, e) in enumerate(itertools.zip_longest(got[serial][1]["colors"], expected)):
assert p is not None and e is not None, (got, expected)
for j, (pc, ec) in enumerate(itertools.zip_longest(p, e)):
if pc != ec:
print(f"Part {i}, Color {j}, Got {pc}, Want {ec}")
assert got[serial][1]["colors"] == expected
if __name__ == "__main__":
__import__("photons_core").run("lan:is_it_a_tile {@:1:}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment