Created
December 16, 2019 09:34
-
-
Save delfick/391d4f686159bd22f365ee2782079f6e to your computer and use it in GitHub Desktop.
An example photons script that creates a flicker effect on one or more devices.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from photons_app.actions import an_action | |
from photons_app import helpers as hp | |
from photons_control.script import FromGeneratorPerSerial | |
from photons_messages import LightMessages | |
from delfick_project.norms import sb, Meta, BadSpecValue | |
from delfick_project.addons import addon_hook | |
import logging | |
import asyncio | |
import random | |
import time | |
log = logging.getLogger("info") | |
@addon_hook( | |
extras=[ | |
("lifx.photons", "transport"), | |
("lifx.photons", "messages"), | |
("lifx.photons", "device_finder"), | |
] | |
) | |
def __lifx__(collector, *args, **kwargs): | |
pass | |
class between_0_and_1(sb.Spec): | |
"""A norms spec for validating a value is a float between 0 and 1""" | |
def normalise_filled(self, meta, val): | |
val = sb.float_spec().normalise(meta, val) | |
if val < 0 or val > 1: | |
raise BadSpecValue("Number must be between 0 and 1") | |
return val | |
def get_random_brightness(original_brightness, max_drop): | |
""" | |
Return a new brightness between 0 and 1 that is the original brightness | |
minus a percentage that is never more than max_drop | |
""" | |
percentage = 1 - (random.randrange(0, max_drop * 100) / 100) | |
new_brightness = original_brightness * percentage | |
if new_brightness < 0: | |
return 0 | |
elif new_brightness > 1: | |
return 1 | |
else: | |
return new_brightness | |
def Flicker(gap=0.25, candle_color=False, max_brightness_drop=0.35): | |
async def gen(reference, afr, **kwargs): | |
""" | |
1. Get current brightness. This is our anchor point. All brightness | |
changes will be relative to this amount. | |
2. Wait a small delay per device, so that the devices are likely out of | |
sync | |
3. Start a loop from now until the heat death of the universe | |
a. Determine a new brightness. This is the original brightness minus | |
a percentage that is never more than max_brightness_drop. So if | |
max_brightness_drop is 0.35, then the brightness will never be | |
more than 35% less than the original value | |
b. Send a message to the device setting this brightness. If candle_color | |
is true, then we make the light white with a kelvin of 2500 | |
This message will have a duration equal to the ``gap`` seconds | |
c. Wait enough time so that from the point we sent the message to | |
the end of the sleep it has been ``gap`` seconds. | |
""" | |
get_color = LightMessages.GetColor() | |
original_brightness = 1 | |
async for pkt, _, _ in afr.transport_target.script(get_color).run_with( | |
reference, afr, **kwargs | |
): | |
if pkt | LightMessages.LightState: | |
original_brightness = pkt.brightness | |
start_delay = random.randrange(0, gap * 100) / 100 | |
if start_delay > 0: | |
await asyncio.sleep(start_delay) | |
log.info(hp.lc("Starting flicker for device", serial=reference)) | |
while True: | |
start = time.time() | |
new_brightness = get_random_brightness(original_brightness, max_brightness_drop) | |
if candle_color: | |
msg = LightMessages.SetColor( | |
hue=0, saturation=0, brightness=new_brightness, kelvin=2500, duration=gap | |
) | |
else: | |
msg = LightMessages.SetWaveformOptional( | |
brightness=new_brightness, period=gap, cycles=1 | |
) | |
yield msg | |
diff = gap - (time.time() - start) | |
if diff > 0: | |
await asyncio.sleep(diff) | |
# Start the gen function for each serial that we find | |
return FromGeneratorPerSerial(gen) | |
@an_action(needs_target=True, special_reference=True) | |
async def flicker(collector, target, reference, **kwargs): | |
""" | |
Run a flicker animation on one or more devices at the same time. | |
To run against all devices on the network with default options:: | |
./flicker | |
To run against a particular device:: | |
./flicker d073d5001337 | |
Or use the device finder:: | |
./flicker match:label=kitchen | |
You may specify options, for example:: | |
./flicker -- '{"candle_color": true}' | |
The options are: | |
gap - float - defaults 0.25 | |
The number of seconds between each brightness change | |
candle_color - boolean - defaults false | |
If true, we set the light to a candle color, otherwise the effect just | |
changes brightness | |
max_brightness_drop - float - default 0.35 | |
The max percentage of the original brightness that will ever be set on | |
the device | |
""" | |
# Create a spec for turning extra options into the keyword arguments that | |
# the flicker script takes in | |
spec = sb.set_options( | |
gap=sb.defaulted(sb.float_spec(), 0.25), | |
candle_color=sb.optional_spec(sb.boolean()), | |
max_brightness_drop=sb.optional_spec(between_0_and_1()), | |
) | |
extra = collector.configuration["photons_app"].extra_as_json | |
# Create kwargs for instantiating Flicker with. We include only the options | |
# that were specified | |
kwargs = { | |
k: v for k, v in spec.normalise(Meta.empty(), extra).items() if v is not sb.NotSpecified | |
} | |
def e(error): | |
"""Error catcher that just debug prints errors""" | |
log.debug(hp.lc("Failed to send a message", error=error)) | |
# Start our Flicker script against the devices specified on the command line. | |
await target.script(Flicker(**kwargs)).run_with_all( | |
reference, message_timeout=kwargs["gap"], error_catcher=e | |
) | |
if __name__ == "__main__": | |
from photons_app.executor import main | |
import sys | |
main(["lan:flicker"] + sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment