|
from photons_app.actions import an_action |
|
from photons_app import helpers as hp |
|
|
|
from photons_control.planner import plans, Plan, make_plans, Gatherer, Skip |
|
from photons_messages import LightMessages, DeviceMessages, TileMessages |
|
from photons_control.script import FromGeneratorPerSerial |
|
from photons_control.tile import tiles_from |
|
|
|
from delfick_project.norms import sb, Meta, BadSpecValue |
|
from delfick_project.addons import addon_hook |
|
import logging |
|
import asyncio |
|
import random |
|
import time |
|
|
|
log = logging.getLogger("flicker") |
|
|
|
|
|
@addon_hook( |
|
extras=[ |
|
("lifx.photons", "transport"), |
|
("lifx.photons", "messages"), |
|
("lifx.photons", "device_finder"), |
|
] |
|
) |
|
def __lifx__(collector, *args, **kwargs): |
|
pass |
|
|
|
|
|
class OriginalValue(Plan): |
|
""" |
|
A plan to get the original brightness and power of the device |
|
|
|
For multizone and matrix products it will average the brightness of the |
|
individual zones to get the brightness |
|
|
|
Note that the tile I tested this with didn't seem to have much difference |
|
between 50% and 100% brightness, so I cheat and max the original brightness |
|
for such devices to 50%. |
|
""" |
|
|
|
@property |
|
def dependant_info(kls): |
|
# Ideally I'd have a plans.TileStatePlan but that doesn't exist at the moment |
|
return {"c": plans.CapabilityPlan(), "z": plans.ZonesPlan()} |
|
|
|
class Instance(Plan.Instance): |
|
def setup(self): |
|
self.data = {"brightness": None, "power": None} |
|
self.num_zones = 0 |
|
|
|
if self.deps["z"] != Skip: |
|
self.zones = [color.brightness for _, color in self.deps["z"]] |
|
self.num_zones = len(self.zones) |
|
else: |
|
self.zones = [] |
|
|
|
@property |
|
def is_matrix(self): |
|
return self.deps["c"]["cap"].has_matrix |
|
|
|
@property |
|
def is_multizone(self): |
|
return self.deps["c"]["cap"].has_multizone |
|
|
|
@property |
|
def messages(self): |
|
msgs = [DeviceMessages.GetPower()] |
|
|
|
if self.is_matrix: |
|
msgs.extend( |
|
[ |
|
TileMessages.Get64(tile_index=0, length=5, x=0, y=0, width=8), |
|
TileMessages.GetDeviceChain(), |
|
] |
|
) |
|
elif not self.is_multizone: |
|
msgs.append(LightMessages.GetColor()) |
|
|
|
return msgs |
|
|
|
def process(self, pkt): |
|
if pkt | DeviceMessages.StatePower: |
|
self.data["power"] = pkt.level > 0 |
|
|
|
elif pkt | LightMessages.LightState: |
|
self.data["brightness"] = pkt.brightness |
|
|
|
elif pkt | TileMessages.State64: |
|
self.zones.extend([color.brightness for color in pkt.colors]) |
|
|
|
elif pkt | TileMessages.StateDeviceChain: |
|
for tile in tiles_from(pkt): |
|
self.num_zones += tile.height * tile.width |
|
|
|
if self.num_zones > 0 and len(self.zones) >= self.num_zones: |
|
self.data["brightness"] = sum(self.zones) / len(self.zones) |
|
|
|
return all(v is not None for v in self.data.values()) |
|
|
|
async def info(self): |
|
if self.is_matrix and self.data["brightness"] > 0.5: |
|
# On my tiles at least there seems little difference between 0.5 and 1 |
|
self.data["brightness"] = 0.5 |
|
return self.data |
|
|
|
|
|
class between_0_and_1(sb.Spec): |
|
"""A norms spec for validating a value is a float between 0 and 1""" |
|
|
|
def normalise_filled(self, meta, val): |
|
val = sb.float_spec().normalise(meta, val) |
|
if val < 0 or val > 1: |
|
raise BadSpecValue("Number must be between 0 and 1") |
|
return val |
|
|
|
|
|
def get_random_brightness(original_brightness, max_drop): |
|
""" |
|
Return a new brightness between 0 and 1 that is the original brightness |
|
minus a percentage that is never more than max_drop |
|
""" |
|
percentage = 1 - (random.randrange(0, max_drop * 100) / 100) |
|
new_brightness = original_brightness * percentage |
|
if new_brightness < 0: |
|
return 0 |
|
elif new_brightness > 1: |
|
return 1 |
|
else: |
|
return new_brightness |
|
|
|
|
|
def Flicker( |
|
gap=0.25, candle_color=False, max_brightness_drop=0.35, power_on=True, start_brightness=None |
|
): |
|
async def gen(reference, afr, **kwargs): |
|
""" |
|
1. Get current brightness. This is our anchor point. All brightness |
|
changes will be relative to this amount. |
|
2. Wait a small delay per device, so that the devices are likely out of |
|
sync |
|
3. Start a loop from now until the heat death of the universe |
|
a. Determine a new brightness. This is the original brightness minus |
|
a percentage that is never more than max_brightness_drop. So if |
|
max_brightness_drop is 0.35, then the brightness will never be |
|
more than 35% less than the original value |
|
b. Send a message to the device setting this brightness. If candle_color |
|
is true, then we make the light white with a kelvin of 2500 |
|
|
|
This message will have a duration equal to the ``gap`` seconds |
|
c. Wait enough time so that from the point we sent the message to |
|
the end of the sleep it has been ``gap`` seconds. |
|
""" |
|
if power_on: |
|
t = yield DeviceMessages.SetPower(level=65535, res_required=False) |
|
|
|
# Give the power time to set |
|
await t |
|
await asyncio.sleep(0.4) |
|
|
|
if start_brightness is not None: |
|
yield LightMessages.SetWaveformOptional(brightness=start_brightness, res_required=False) |
|
|
|
# Find the original brightness |
|
# And whether light is on or off |
|
# Devices that are off will not have the effect run against them |
|
plans = make_plans("capability", original=OriginalValue()) |
|
gathered = await Gatherer(afr.transport_target).gather_all( |
|
plans, reference, afr, **{**kwargs, "message_timeout": 5} |
|
) |
|
|
|
product = None |
|
original_brightness = 1 |
|
|
|
if reference not in gathered: |
|
log.warning(hp.lc("Failed to gather original values from device", serial=reference)) |
|
else: |
|
info = gathered[reference][1] |
|
|
|
if "original" in info: |
|
if not info["original"]["power"]: |
|
log.info(hp.lc("Device is turned off, will ignore", serial=reference)) |
|
return |
|
original_brightness = info["original"]["brightness"] |
|
|
|
if "capability" in info: |
|
product = info["capability"]["cap"].product.name |
|
|
|
# Make sure the different lights aren't in sync to make the effect better |
|
start_delay = random.randrange(0, gap * 100) / 100 |
|
if start_delay > 0: |
|
await asyncio.sleep(start_delay) |
|
|
|
log.info( |
|
hp.lc( |
|
"Starting flicker for device", |
|
serial=reference, |
|
product=product, |
|
original_brightness=float(f"{original_brightness:0.02f}"), |
|
) |
|
) |
|
|
|
while True: |
|
new_brightness = get_random_brightness(original_brightness, max_brightness_drop) |
|
|
|
if candle_color: |
|
msg = LightMessages.SetColor( |
|
hue=0, |
|
saturation=0, |
|
brightness=new_brightness, |
|
kelvin=2500, |
|
duration=gap, |
|
res_required=False, |
|
) |
|
else: |
|
msg = LightMessages.SetWaveformOptional( |
|
brightness=new_brightness, period=gap, cycles=1, res_required=False |
|
) |
|
|
|
start = time.time() |
|
|
|
# Send the message to the device |
|
yield msg |
|
|
|
# Wait till the next time we should send a message |
|
diff = gap - (time.time() - start) |
|
if diff > 0: |
|
await asyncio.sleep(diff) |
|
|
|
# Return a script that starts the gen function for each serial that we find |
|
return FromGeneratorPerSerial(gen) |
|
|
|
|
|
@an_action(needs_target=True, special_reference=True) |
|
async def flicker(collector, target, reference, **kwargs): |
|
""" |
|
Run a flicker animation on one or more devices at the same time. |
|
|
|
To run against all devices on the network with default options:: |
|
|
|
./flicker |
|
|
|
To run against a particular device:: |
|
|
|
./flicker d073d5001337 |
|
|
|
Or use the device finder:: |
|
|
|
./flicker match:label=kitchen |
|
|
|
You may specify options, for example:: |
|
|
|
./flicker -- '{"candle_color": true}' |
|
|
|
The options are: |
|
|
|
gap - float - defaults 0.25 |
|
The number of seconds between each brightness change |
|
|
|
candle_color - boolean - defaults false |
|
If true, we set the light to a candle color, otherwise the effect just |
|
changes brightness |
|
|
|
max_brightness_drop - float between 0 and 1 - default 0.35 |
|
The max percentage of the original brightness that will ever be set on |
|
the device |
|
|
|
power_on - boolean - default true |
|
Turn the device on before flicker starts |
|
|
|
start_brightness - float between 0 and 1 - default not specified |
|
If specified, we set the initial brightness to this value. |
|
""" |
|
|
|
# Create a spec for turning extra options into the keyword arguments that |
|
# the flicker script takes in |
|
spec = sb.set_options( |
|
gap=sb.defaulted(sb.float_spec(), 0.25), |
|
candle_color=sb.optional_spec(sb.boolean()), |
|
power_on=sb.optional_spec(sb.boolean()), |
|
max_brightness_drop=sb.optional_spec(between_0_and_1()), |
|
start_brightness=sb.optional_spec(between_0_and_1()), |
|
) |
|
extra = collector.configuration["photons_app"].extra_as_json |
|
|
|
# Create kwargs for instantiating Flicker with. We include only the options |
|
# that were specified |
|
kwargs = { |
|
k: v for k, v in spec.normalise(Meta.empty(), extra).items() if v is not sb.NotSpecified |
|
} |
|
|
|
def e(error): |
|
"""Error catcher that just debug prints errors""" |
|
log.debug(hp.lc("Failed to send a message", error=error)) |
|
|
|
# Start our Flicker script against the devices specified on the command line. |
|
await target.script(Flicker(**kwargs)).run_with_all( |
|
reference, message_timeout=kwargs["gap"], error_catcher=e |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
from photons_app.executor import main |
|
import sys |
|
|
|
main(["lan:flicker"] + sys.argv[1:]) |