-
-
Save delfick/5252bdd90d05e771dcfaa795ea8388bf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from photons_app.tasks import task_register as task | |
from photons_app.errors import UserQuit | |
from photons_messages import LightMessages, DeviceMessages, Services | |
from delfick_project.addons import addon_hook | |
import logging | |
import time | |
log = logging.getLogger("weird") | |
@task | |
class weird(task.Task): | |
target = task.requires_target() | |
async def execute_task(self, **kwargs): | |
async with self.target.session() as sender: | |
# Photons uses GetService and does some tricky stuff with the retries when it comes to discovery | |
# https://github.com/delfick/photons/blob/main/modules/photons_transport/session/network.py#L80 | |
# https://github.com/delfick/photons/blob/main/modules/photons_transport/comms/base.py#L345 | |
# GetService means you also know the port. In practise though it is always 56700 | |
# GetVersion is better than GetColor though because the StateVersion lets you know what what each device is | |
# Also, GetColor will return different things based on whether it's a switch or not, because switches | |
# don't support light messages because it's not a light! | |
# | |
# For this I'll try and replicate what you say you're doing, but it will discover less than it | |
# should if your network has enough lights that are being slow | |
found = [] | |
async for pkt in sender(LightMessages.GetColor(), broadcast=True, no_retry=True): | |
if pkt | DeviceMessages.StateUnhandled or pkt | LightMessages.LightState: | |
found.append(pkt.serial) | |
addr = pkt.Information.remote_addr | |
await sender.add_service(pkt.serial, Services.UDP, host=addr[0], port=56700) | |
while True: | |
choice = self.choose(found) | |
if choice is not None: | |
await self.power(sender, *choice) | |
def choose(self, found): | |
print() | |
print("-" * 10) | |
for i, serial in enumerate(found): | |
print(f"{i}: {serial}") | |
try: | |
chosen = input("Which device do you want to send power to? ") | |
except EOFError: | |
raise UserQuit() | |
if not chosen or not chosen.isdigit(): | |
print(f"Need to choose a number between 0 and {len(found)-1}") | |
return | |
chosen = int(chosen) | |
if chosen < 0 or chosen >= len(found): | |
print(f"Need to choose a number between 0 and {len(found)-1}") | |
return | |
serial = found[chosen] | |
try: | |
on_or_off = input("type 'on' or anything else for off: ") | |
except EOFError: | |
raise UserQuit() | |
level = 65535 if on_or_off == "on" else 0 | |
return serial, level | |
async def power(self, sender, serial, level): | |
start = time.time() | |
errors = [] | |
await sender(DeviceMessages.SetPower(level=level), serial, error_catcher=errors, no_retry=True) | |
error = "with no errors" if len(errors) == 0 else f"with an error: {errors}" | |
print(f"Got reply in {time.time() - start:.2f} seconds {error}") | |
if __name__ == "__main__": | |
__import__("photons_core").run("lan:weird {@:1:}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment