Skip to content

Instantly share code, notes, and snippets.

@delfick
Last active July 25, 2021 07:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save delfick/d67898c132283439c3787eb5c072ae72 to your computer and use it in GitHub Desktop.
Save delfick/d67898c132283439c3787eb5c072ae72 to your computer and use it in GitHub Desktop.
An example photons script that creates a flicker effect on one or more devices.

To setup download flicker.py somewhere and in that folder:

# Using a version of python greater or equal to 3.6
$ python3 -m venv venv
$ source venv/bin/activate
$ pip install lifx-photons-core
$ python flicker.py

To run against all devices on the network with default options:

python flicker.py

To run against a particular device:

python flicker.py d073d5001337

Or use the device finder:

python flicker.py match:label=kitchen

See https://delfick.github.io/photons-core/tasks.html#references

You may specify options, for example:

python flicker.py -- '{"candle_color": true}'

The options are:

gap - float - defaults 0.25

The number of seconds between each brightness change

candle_color - boolean - defaults false

If true, we set the light to a candle color, otherwise the effect just changes brightness

max_brightness_drop - float - default 0.35

Every change is the original brightness minus a percentage. The maximum that percentage can be is this value. So if this is 0.5, then the brightness will never drop below 50% of the original value.

power_on - boolean - default true

Turn the devices on before the animation starts

start_brightness - float between 0 and 1 - default not specified

If specified, we set the initial brightness to this value.

from photons_app.actions import an_action
from photons_app import helpers as hp
from photons_control.planner import plans, Plan, make_plans, Gatherer, Skip
from photons_messages import LightMessages, DeviceMessages, TileMessages
from photons_control.script import FromGeneratorPerSerial
from photons_control.tile import tiles_from
from delfick_project.norms import sb, Meta, BadSpecValue
from delfick_project.addons import addon_hook
import logging
import asyncio
import random
import time
log = logging.getLogger("flicker")
@addon_hook(
extras=[
("lifx.photons", "transport"),
("lifx.photons", "messages"),
("lifx.photons", "device_finder"),
]
)
def __lifx__(collector, *args, **kwargs):
pass
class OriginalValue(Plan):
"""
A plan to get the original brightness and power of the device
For multizone and matrix products it will average the brightness of the
individual zones to get the brightness
Note that the tile I tested this with didn't seem to have much difference
between 50% and 100% brightness, so I cheat and max the original brightness
for such devices to 50%.
"""
@property
def dependant_info(kls):
# Ideally I'd have a plans.TileStatePlan but that doesn't exist at the moment
return {"c": plans.CapabilityPlan(), "z": plans.ZonesPlan()}
class Instance(Plan.Instance):
def setup(self):
self.data = {"brightness": None, "power": None}
self.num_zones = 0
if self.deps["z"] != Skip:
self.zones = [color.brightness for _, color in self.deps["z"]]
self.num_zones = len(self.zones)
else:
self.zones = []
@property
def is_matrix(self):
return self.deps["c"]["cap"].has_matrix
@property
def is_multizone(self):
return self.deps["c"]["cap"].has_multizone
@property
def messages(self):
msgs = [DeviceMessages.GetPower()]
if self.is_matrix:
msgs.extend(
[
TileMessages.Get64(tile_index=0, length=5, x=0, y=0, width=8),
TileMessages.GetDeviceChain(),
]
)
elif not self.is_multizone:
msgs.append(LightMessages.GetColor())
return msgs
def process(self, pkt):
if pkt | DeviceMessages.StatePower:
self.data["power"] = pkt.level > 0
elif pkt | LightMessages.LightState:
self.data["brightness"] = pkt.brightness
elif pkt | TileMessages.State64:
self.zones.extend([color.brightness for color in pkt.colors])
elif pkt | TileMessages.StateDeviceChain:
for tile in tiles_from(pkt):
self.num_zones += tile.height * tile.width
if self.num_zones > 0 and len(self.zones) >= self.num_zones:
self.data["brightness"] = sum(self.zones) / len(self.zones)
return all(v is not None for v in self.data.values())
async def info(self):
if self.is_matrix and self.data["brightness"] > 0.5:
# On my tiles at least there seems little difference between 0.5 and 1
self.data["brightness"] = 0.5
return self.data
class between_0_and_1(sb.Spec):
"""A norms spec for validating a value is a float between 0 and 1"""
def normalise_filled(self, meta, val):
val = sb.float_spec().normalise(meta, val)
if val < 0 or val > 1:
raise BadSpecValue("Number must be between 0 and 1")
return val
def get_random_brightness(original_brightness, max_drop):
"""
Return a new brightness between 0 and 1 that is the original brightness
minus a percentage that is never more than max_drop
"""
percentage = 1 - (random.randrange(0, max_drop * 100) / 100)
new_brightness = original_brightness * percentage
if new_brightness < 0:
return 0
elif new_brightness > 1:
return 1
else:
return new_brightness
def Flicker(
gap=0.25, candle_color=False, max_brightness_drop=0.35, power_on=True, start_brightness=None
):
async def gen(reference, afr, **kwargs):
"""
1. Get current brightness. This is our anchor point. All brightness
changes will be relative to this amount.
2. Wait a small delay per device, so that the devices are likely out of
sync
3. Start a loop from now until the heat death of the universe
a. Determine a new brightness. This is the original brightness minus
a percentage that is never more than max_brightness_drop. So if
max_brightness_drop is 0.35, then the brightness will never be
more than 35% less than the original value
b. Send a message to the device setting this brightness. If candle_color
is true, then we make the light white with a kelvin of 2500
This message will have a duration equal to the ``gap`` seconds
c. Wait enough time so that from the point we sent the message to
the end of the sleep it has been ``gap`` seconds.
"""
if power_on:
t = yield DeviceMessages.SetPower(level=65535, res_required=False)
# Give the power time to set
await t
await asyncio.sleep(0.4)
if start_brightness is not None:
yield LightMessages.SetWaveformOptional(brightness=start_brightness, res_required=False)
# Find the original brightness
# And whether light is on or off
# Devices that are off will not have the effect run against them
plans = make_plans("capability", original=OriginalValue())
gathered = await Gatherer(afr.transport_target).gather_all(
plans, reference, afr, **{**kwargs, "message_timeout": 5}
)
product = None
original_brightness = 1
if reference not in gathered:
log.warning(hp.lc("Failed to gather original values from device", serial=reference))
else:
info = gathered[reference][1]
if "original" in info:
if not info["original"]["power"]:
log.info(hp.lc("Device is turned off, will ignore", serial=reference))
return
original_brightness = info["original"]["brightness"]
if "capability" in info:
product = info["capability"]["cap"].product.name
# Make sure the different lights aren't in sync to make the effect better
start_delay = random.randrange(0, gap * 100) / 100
if start_delay > 0:
await asyncio.sleep(start_delay)
log.info(
hp.lc(
"Starting flicker for device",
serial=reference,
product=product,
original_brightness=float(f"{original_brightness:0.02f}"),
)
)
while True:
new_brightness = get_random_brightness(original_brightness, max_brightness_drop)
if candle_color:
msg = LightMessages.SetColor(
hue=0,
saturation=0,
brightness=new_brightness,
kelvin=2500,
duration=gap,
res_required=False,
)
else:
msg = LightMessages.SetWaveformOptional(
brightness=new_brightness, period=gap, cycles=1, res_required=False
)
start = time.time()
# Send the message to the device
yield msg
# Wait till the next time we should send a message
diff = gap - (time.time() - start)
if diff > 0:
await asyncio.sleep(diff)
# Return a script that starts the gen function for each serial that we find
return FromGeneratorPerSerial(gen)
@an_action(needs_target=True, special_reference=True)
async def flicker(collector, target, reference, **kwargs):
"""
Run a flicker animation on one or more devices at the same time.
To run against all devices on the network with default options::
./flicker
To run against a particular device::
./flicker d073d5001337
Or use the device finder::
./flicker match:label=kitchen
You may specify options, for example::
./flicker -- '{"candle_color": true}'
The options are:
gap - float - defaults 0.25
The number of seconds between each brightness change
candle_color - boolean - defaults false
If true, we set the light to a candle color, otherwise the effect just
changes brightness
max_brightness_drop - float between 0 and 1 - default 0.35
The max percentage of the original brightness that will ever be set on
the device
power_on - boolean - default true
Turn the device on before flicker starts
start_brightness - float between 0 and 1 - default not specified
If specified, we set the initial brightness to this value.
"""
# Create a spec for turning extra options into the keyword arguments that
# the flicker script takes in
spec = sb.set_options(
gap=sb.defaulted(sb.float_spec(), 0.25),
candle_color=sb.optional_spec(sb.boolean()),
power_on=sb.optional_spec(sb.boolean()),
max_brightness_drop=sb.optional_spec(between_0_and_1()),
start_brightness=sb.optional_spec(between_0_and_1()),
)
extra = collector.configuration["photons_app"].extra_as_json
# Create kwargs for instantiating Flicker with. We include only the options
# that were specified
kwargs = {
k: v for k, v in spec.normalise(Meta.empty(), extra).items() if v is not sb.NotSpecified
}
def e(error):
"""Error catcher that just debug prints errors"""
log.debug(hp.lc("Failed to send a message", error=error))
# Start our Flicker script against the devices specified on the command line.
await target.script(Flicker(**kwargs)).run_with_all(
reference, message_timeout=kwargs["gap"], error_catcher=e
)
if __name__ == "__main__":
from photons_app.executor import main
import sys
main(["lan:flicker"] + sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment