Skip to content

Instantly share code, notes, and snippets.

@delfick

delfick/weird.py Secret

Last active April 10, 2021 21:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save delfick/5252bdd90d05e771dcfaa795ea8388bf to your computer and use it in GitHub Desktop.
Save delfick/5252bdd90d05e771dcfaa795ea8388bf to your computer and use it in GitHub Desktop.
from photons_app.tasks import task_register as task
from photons_app.errors import UserQuit
from photons_messages import LightMessages, DeviceMessages, Services
from delfick_project.addons import addon_hook
import logging
import time
log = logging.getLogger("weird")
@task
class weird(task.Task):
target = task.requires_target()
async def execute_task(self, **kwargs):
async with self.target.session() as sender:
# Photons uses GetService and does some tricky stuff with the retries when it comes to discovery
# https://github.com/delfick/photons/blob/main/modules/photons_transport/session/network.py#L80
# https://github.com/delfick/photons/blob/main/modules/photons_transport/comms/base.py#L345
# GetService means you also know the port. In practise though it is always 56700
# GetVersion is better than GetColor though because the StateVersion lets you know what what each device is
# Also, GetColor will return different things based on whether it's a switch or not, because switches
# don't support light messages because it's not a light!
#
# For this I'll try and replicate what you say you're doing, but it will discover less than it
# should if your network has enough lights that are being slow
found = []
async for pkt in sender(LightMessages.GetColor(), broadcast=True, no_retry=True):
if pkt | DeviceMessages.StateUnhandled or pkt | LightMessages.LightState:
found.append(pkt.serial)
addr = pkt.Information.remote_addr
await sender.add_service(pkt.serial, Services.UDP, host=addr[0], port=56700)
while True:
choice = self.choose(found)
if choice is not None:
await self.power(sender, *choice)
def choose(self, found):
print()
print("-" * 10)
for i, serial in enumerate(found):
print(f"{i}: {serial}")
try:
chosen = input("Which device do you want to send power to? ")
except EOFError:
raise UserQuit()
if not chosen or not chosen.isdigit():
print(f"Need to choose a number between 0 and {len(found)-1}")
return
chosen = int(chosen)
if chosen < 0 or chosen >= len(found):
print(f"Need to choose a number between 0 and {len(found)-1}")
return
serial = found[chosen]
try:
on_or_off = input("type 'on' or anything else for off: ")
except EOFError:
raise UserQuit()
level = 65535 if on_or_off == "on" else 0
return serial, level
async def power(self, sender, serial, level):
start = time.time()
errors = []
await sender(DeviceMessages.SetPower(level=level), serial, error_catcher=errors, no_retry=True)
error = "with no errors" if len(errors) == 0 else f"with an error: {errors}"
print(f"Got reply in {time.time() - start:.2f} seconds {error}")
if __name__ == "__main__":
__import__("photons_core").run("lan:weird {@:1:}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment