Skip to content

Instantly share code, notes, and snippets.

@madmo
Created August 31, 2021 05:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save madmo/18ff6e67a6974e4f109c5730ba73437b to your computer and use it in GitHub Desktop.
Save madmo/18ff6e67a6974e4f109c5730ba73437b to your computer and use it in GitHub Desktop.
Simple webcam on/off script used to control a onair sign with tasmota firmware
#!/usr/bin/python
import pyinotify
import asyncio
import aiohttp
import sys
def debounce(wait):
def decorator(fn):
def debounced(*args, **kwargs):
async def call_it():
await asyncio.sleep(wait)
fn(*args, **kwargs)
try:
debounced.t.cancel()
except (AttributeError):
pass
debounced.t = asyncio.ensure_future(call_it())
return debounced
return decorator
class WebcamWatch:
def __init__(self, loop, device, host):
self.watch_manager = pyinotify.WatchManager()
self.watch_manager.add_watch(
device,
pyinotify.IN_OPEN | pyinotify.IN_CLOSE_NOWRITE | pyinotify.IN_CLOSE_WRITE,
)
self.handler = WebcamWatch.EventHandler(self.notify)
self.notifier = pyinotify.AsyncioNotifier(
self.watch_manager, loop, default_proc_fun=self.handler
)
self.host = host
def __del__(self):
self.notifier.stop()
class EventHandler(pyinotify.ProcessEvent):
def __init__(self, notify, *args, **kwargs):
self.count = 0
self.notify = notify
self.active = False
super().__init__(*args, **kwargs)
@debounce(1)
def _notify(self):
asyncio.ensure_future(self.notify(self.active))
def process_IN_OPEN(self, event):
self.count = self.count + 1
if self.active == False:
self.active = True
self._notify()
def process_IN_CLOSE(self, event):
self.count = self.count - 1
if self.count < 0:
self.count = 0
if self.active and self.count == 0:
self.active = False
self._notify()
async def notify(self, isActive):
session = aiohttp.ClientSession()
params = None
if isActive:
params = {"cmnd" : "Power On"}
else:
params = {"cmnd" : "Power Off"}
async with session.get("http://{}/cm".format(self.host), params=params) as resp:
await resp.text()
await session.close()
loop = asyncio.get_event_loop()
watch = WebcamWatch(loop, "/dev/video0", sys.argv[1])
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment