Skip to content

Instantly share code, notes, and snippets.

@delfick
Last active April 13, 2020 13:04
Show Gist options
  • Save delfick/bd25f070775fc24d82f02bcf4cc09536 to your computer and use it in GitHub Desktop.
Save delfick/bd25f070775fc24d82f02bcf4cc09536 to your computer and use it in GitHub Desktop.
from photons_app.actions import an_action
from photons_control.script import FromGenerator
from photons_messages import LightMessages
import logging
import asyncio
import time
log = logging.getLogger("red")
async def iterate(end_after=100, step=5):
for i in range(0, end_after, step):
start = time.time()
yield end_after - i
diff = step - (time.time() - start)
if diff > 0:
await asyncio.sleep(diff)
@an_action(needs_target=True, needs_reference=True, special_reference=True)
async def red(collector, target, reference, **kwargs):
async def gen(reference, sender, **kwargs):
async for time_left in iterate():
print("TIME LEFT", time_left)
await (yield LightMessages.GetColor())
await (
yield LightMessages.SetWaveformOptional(
hue=0, saturation=1, cycles=1, period=time_left, res_required=False
)
)
msg = FromGenerator(gen, reference_override=True)
def error(e):
log.error(e)
async for pkt in target.send(msg, reference, error_catcher=error):
if pkt | LightMessages.LightState:
print(pkt.serial, "sat: ", pkt.saturation)
if __name__ == "__main__":
__import__("photons_core").run_cli("lan:red {@:1:}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment