Last active
June 13, 2022 15:58
-
-
Save weargoggles/b789d21853a223d01a8fab1a5e5b2166 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
import subprocess | |
import httpx | |
from aiozeroconf import ServiceBrowser, Zeroconf | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s: %(message)s") | |
logger = logging.getLogger(__name__) | |
# The following have been discovered through trial and error, watching logs, on | |
# several versions of macOS, and using several webcams. If they don't work, try | |
# watching the output of `log stream` while you turn your camera off and on. | |
PROCESS_NAMES = { | |
"VDCAssistant", | |
"AppleCameraAssistant", | |
"UVCAssistant", | |
"appleh13camerad", | |
} | |
START_EVENTS = { | |
"StartHardwareStream", | |
"kCameraStreamStart", | |
'"VDCAssistant_Power_State" = On;', | |
"StartStream", | |
} | |
END_EVENTS = { | |
"StopHardwareStream", | |
"kCameraStreamStop", | |
'"VDCAssistant_Power_State" = Off;', | |
"StopStream", | |
} | |
async def do_close(zc): | |
await zc.close() | |
class LightController(object): | |
def __init__(self, *args, **kwargs): | |
self.lights = {} | |
self.name_lut = {} | |
def remove_service(self, zeroconf, type_, name): | |
print("Service %s removed" % (name,)) | |
if name in self.name_lut: | |
del self.lights[self.name_lut[name]] | |
del self.name_lut[name] | |
def add_service(self, zeroconf, type_, name): | |
asyncio.ensure_future(self.found_service(zeroconf, type_, name)) | |
async def found_service(self, zeroconf, type_, name): | |
info = await zeroconf.get_service_info(type_, name) | |
async with httpx.AsyncClient() as client: | |
res = await client.get(f"http://{info.server}:{info.port}/elgato/accessory-info", headers={"Accept": "application/json"}) | |
accessory_info = res.json() | |
display_name = accessory_info['displayName'] | |
print(f"Found light: {display_name}, {info}, {accessory_info}") | |
self.lights[display_name] = {"info": info, "accessory_info": accessory_info} | |
self.name_lut[name] = display_name | |
async def switch_on(self, light_name): | |
if light_name not in self.lights: | |
return | |
info = self.lights[light_name]["info"] | |
async with httpx.AsyncClient() as client: | |
await client.put(f"http://{info.server}:{info.port}/elgato/lights", json={"lights": [{"on": 1}]}) | |
async def switch_off(self, light_name): | |
if light_name not in self.lights: | |
return | |
info = self.lights[light_name]["info"] | |
async with httpx.AsyncClient() as client: | |
await client.put(f"http://{info.server}:{info.port}/elgato/lights", json={"lights": [{"on": 0}]}) | |
def get_pids(): | |
for process_name in PROCESS_NAMES: | |
try: | |
yield subprocess.check_output( | |
f"pgrep {process_name}", shell=True, text=True | |
).strip() | |
except subprocess.CalledProcessError: | |
pass | |
async def stream_logs(light_controller, handle_camera_on, handle_camera_off): | |
camera_is_on = False | |
pids = {*get_pids()} | |
if not pids: | |
logger.error("No pids to watch") | |
return | |
args = " ".join(f"--process {pid}" for pid in pids) | |
cmd = f"log stream {args}" | |
logger.info("Watching logs") | |
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT) | |
async for line_bytes in proc.stdout: | |
line = line_bytes.decode() | |
is_start = any(event in line for event in START_EVENTS) | |
is_end = any(event in line for event in END_EVENTS) | |
if not camera_is_on and is_start: | |
logger.info("Camera started") | |
camera_is_on = True | |
await handle_camera_on(light_controller) | |
if camera_is_on and is_end: | |
logger.info("Camera stopped") | |
camera_is_on = False | |
await handle_camera_off(light_controller) | |
async def handle_on(light_controller): | |
await light_controller.switch_on("desk") | |
async def handle_off(light_controller): | |
await light_controller.switch_off("desk") | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
zeroconf = Zeroconf(loop) | |
controller = LightController() | |
browser = ServiceBrowser(zeroconf, "_elg._tcp.local.", listener=controller) | |
loop.create_task(stream_logs(controller, handle_camera_on=handle_on, handle_camera_off=handle_off)) | |
try: | |
loop.run_forever() | |
except KeyboardInterrupt: | |
print("Unregistering...") | |
loop.run_until_complete(do_close(zeroconf)) | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment