Skip to content

Instantly share code, notes, and snippets.

@weargoggles
Last active June 13, 2022 15:58
Show Gist options
  • Save weargoggles/b789d21853a223d01a8fab1a5e5b2166 to your computer and use it in GitHub Desktop.
Save weargoggles/b789d21853a223d01a8fab1a5e5b2166 to your computer and use it in GitHub Desktop.
import asyncio
import logging
import subprocess
import httpx
from aiozeroconf import ServiceBrowser, Zeroconf
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)
# The following have been discovered through trial and error, watching logs, on
# several versions of macOS, and using several webcams. If they don't work, try
# watching the output of `log stream` while you turn your camera off and on.
PROCESS_NAMES = {
"VDCAssistant",
"AppleCameraAssistant",
"UVCAssistant",
"appleh13camerad",
}
START_EVENTS = {
"StartHardwareStream",
"kCameraStreamStart",
'"VDCAssistant_Power_State" = On;',
"StartStream",
}
END_EVENTS = {
"StopHardwareStream",
"kCameraStreamStop",
'"VDCAssistant_Power_State" = Off;',
"StopStream",
}
async def do_close(zc):
await zc.close()
class LightController(object):
def __init__(self, *args, **kwargs):
self.lights = {}
self.name_lut = {}
def remove_service(self, zeroconf, type_, name):
print("Service %s removed" % (name,))
if name in self.name_lut:
del self.lights[self.name_lut[name]]
del self.name_lut[name]
def add_service(self, zeroconf, type_, name):
asyncio.ensure_future(self.found_service(zeroconf, type_, name))
async def found_service(self, zeroconf, type_, name):
info = await zeroconf.get_service_info(type_, name)
async with httpx.AsyncClient() as client:
res = await client.get(f"http://{info.server}:{info.port}/elgato/accessory-info", headers={"Accept": "application/json"})
accessory_info = res.json()
display_name = accessory_info['displayName']
print(f"Found light: {display_name}, {info}, {accessory_info}")
self.lights[display_name] = {"info": info, "accessory_info": accessory_info}
self.name_lut[name] = display_name
async def switch_on(self, light_name):
if light_name not in self.lights:
return
info = self.lights[light_name]["info"]
async with httpx.AsyncClient() as client:
await client.put(f"http://{info.server}:{info.port}/elgato/lights", json={"lights": [{"on": 1}]})
async def switch_off(self, light_name):
if light_name not in self.lights:
return
info = self.lights[light_name]["info"]
async with httpx.AsyncClient() as client:
await client.put(f"http://{info.server}:{info.port}/elgato/lights", json={"lights": [{"on": 0}]})
def get_pids():
for process_name in PROCESS_NAMES:
try:
yield subprocess.check_output(
f"pgrep {process_name}", shell=True, text=True
).strip()
except subprocess.CalledProcessError:
pass
async def stream_logs(light_controller, handle_camera_on, handle_camera_off):
camera_is_on = False
pids = {*get_pids()}
if not pids:
logger.error("No pids to watch")
return
args = " ".join(f"--process {pid}" for pid in pids)
cmd = f"log stream {args}"
logger.info("Watching logs")
proc = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
async for line_bytes in proc.stdout:
line = line_bytes.decode()
is_start = any(event in line for event in START_EVENTS)
is_end = any(event in line for event in END_EVENTS)
if not camera_is_on and is_start:
logger.info("Camera started")
camera_is_on = True
await handle_camera_on(light_controller)
if camera_is_on and is_end:
logger.info("Camera stopped")
camera_is_on = False
await handle_camera_off(light_controller)
async def handle_on(light_controller):
await light_controller.switch_on("desk")
async def handle_off(light_controller):
await light_controller.switch_off("desk")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
zeroconf = Zeroconf(loop)
controller = LightController()
browser = ServiceBrowser(zeroconf, "_elg._tcp.local.", listener=controller)
loop.create_task(stream_logs(controller, handle_camera_on=handle_on, handle_camera_off=handle_off))
try:
loop.run_forever()
except KeyboardInterrupt:
print("Unregistering...")
loop.run_until_complete(do_close(zeroconf))
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment