Skip to content

Instantly share code, notes, and snippets.

@delfick

delfick/cycle.py Secret

Last active March 22, 2020 00:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save delfick/e818308768b88f10e9e58a11552c109a to your computer and use it in GitHub Desktop.
Save delfick/e818308768b88f10e9e58a11552c109a to your computer and use it in GitHub Desktop.
from photons_app.actions import an_action
from photons_control.script import Pipeline, Repeater
from photons_colour import Parser
from delfick_project.addons import addon_hook
import logging
log = logging.getLogger("cycle")
@addon_hook(
extras=[
("lifx.photons", "colour"),
("lifx.photons", "transport"),
("lifx.photons", "messages"),
("lifx.photons", "device_finder"),
]
)
def __lifx__(collector, *args, **kwargs):
pass
@an_action(needs_target=True, special_reference=True)
async def cycle(collector, target, reference, **kwargs):
"""
Do a never ending loop cycling through colours
``cycle d073d5000001,d073d5000002``
or all devices on your network
``cycle``
or say all the devices in a particular group
``cycle match:group_name=kitchen``
Note that at the end of each cycle, we will do a discovery again
"""
spread = 1
colours = [
"blue",
"red",
"orange",
"yellow",
"cyan",
"green",
"blue",
"#6cd67a",
"hue:200 saturation:0.5",
]
color_msgs = [
Parser.color_to_msg(
colour,
overrides={"res_required": False, "duration": spread, "brightness": None},
)
for colour in colours
]
pipeline = Pipeline(*color_msgs, spread=spread, synchronized=True)
repeater = Repeater(pipeline, min_loop_time=len(colours) * spread)
def error_catcher(e):
log.debug(e)
await target.script(repeater).run_with_all(
reference, error_catcher=error_catcher, message_timeout=spread
)
if __name__ == "__main__":
from photons_app.executor import main
import sys
main(["lan:cycle"] + sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment