Created
November 2, 2021 17:32
-
-
Save markizano/1d1bfbaf65ce30dcba84b0d42a6575c2 to your computer and use it in GitHub Desktop.
xwait.py: Wait X number of seconds before returning control of the command after a period of inactivity via the mouse and keyboard.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os, sys | |
import time | |
import json | |
import threading | |
from libinput import LibInput, constant | |
from glob import glob | |
from kizano import getLogger | |
from pprint import pprint | |
log = getLogger(__name__) | |
class XWait(object): | |
''' | |
Watchdog daemon class that will monitor for keyboard and mouse input (KBM). | |
Each KBM input will result in the timer being reset to 0. | |
If the timer exceeds `--wait' seconds, then release control back to the terminal. | |
Usage: | |
xwait 600 && run-my-program.sh | |
Block and wait for ~600 seconds of inactivity from the mouse and keyboard before returning the terminal. | |
''' | |
def __init__(self, nsec=300, *args, **kwargs): | |
''' | |
Open connections to Xserver to start monitoring events. | |
@param nsec (int). Number of seconds to wait before releasing control again. | |
''' | |
super().__init__(*args, **kwargs) | |
log.debug('new XWait()') | |
# Initiate libinput for reading devices from the system bus. | |
self.xinput = LibInput() | |
self._initDevices() | |
self.timeout = nsec | |
self.lastSeen = { | |
'mouse': time.time(), | |
'keyboard': time.time(), | |
} | |
def __enter__(self, *args, **kwargs): | |
''' | |
For use of `with' operator. | |
''' | |
return self | |
def __exit__(self, *args, **kwargs): | |
''' | |
Close device connections for a clean exit. | |
''' | |
log.debug('XWait.__exit__()') | |
for device in self.devices: | |
self.xinput.path_remove_device(device) | |
def _initDevices(self): | |
''' | |
Allows us to collect devices from `/dev/input/*' to monitor for events. | |
''' | |
log.debug('XWait._initDevices()') | |
def isnotdir(f): | |
return not os.path.isdir(f) | |
# Get the list of input devices that not directories. | |
inDevices = list( filter(isnotdir, glob('/dev/input/*') ) ) | |
# Only include the devices that have the capabilities we want. | |
self.devices = [] | |
for inDevice in inDevices: | |
device = self.xinput.path_add_device(inDevice) | |
if not device: | |
# If this is not a valid device recognized by lininput, don't amend to the resulting array. | |
log.info('Device %s empty. Skipping...' % inDevice) | |
continue | |
# Filter through some inputs we only want to monitor. | |
if not device.has_capability(constant.DeviceCapability.POINTER) and not device.has_capability(constant.DeviceCapability.KEYBOARD): | |
log.info('Device %s not a pointer or keyboard. Skipping...' % device.get_sysname() ) | |
self.xinput.path_remove_device(device) | |
continue | |
if 'video' in device.get_name().lower(): | |
log.info('Device %s is a video device. Skipping...' % device.get_sysname() ) | |
self.xinput.path_remove_device(device) | |
continue | |
self.devices.append(device) | |
log.debug('Handling %d devices.' % len(self.devices)) | |
def lastSeenKeyboard(self): | |
''' | |
Gets the last time we saw the keyboard touched. | |
''' | |
return self.lastSeen['keyboard'] | |
def lastSeenMouse(self): | |
''' | |
Gets the last time we saw the mouse moved. | |
''' | |
return self.lastSeen['mouse'] | |
def get_event(self): | |
''' | |
Override to self.xinput.get_event(). | |
Wraps the result in a try/catch block to return None instead. | |
''' | |
log.debug('XWait.getEvent(%d)' % self.timeout) | |
events = self.xinput.get_event(self.timeout) | |
try: | |
for event in events: | |
yield event | |
except StopIteration: | |
return None | |
finally: | |
return None | |
def watchdog(self): | |
''' | |
Watches the mouse for events. Updates the last seen timestamp of the mouse having some activity. | |
''' | |
log.debug('XWait.watchdog()') | |
log.info('Ready and watching the mouse/keyboard! Current time: %d' % time.time()) | |
actions = { | |
constant.Event.POINTER_MOTION: 'mouse moved!', | |
constant.Event.POINTER_BUTTON: 'mouse clicked!', | |
constant.Event.POINTER_AXIS: 'mouse scrolled!', | |
constant.Event.KEYBOARD_KEY: 'keyboard type!', | |
} | |
for event in self.get_event(): | |
if event.type in set(actions.keys()): | |
action = actions.get(event.type) | |
t = time.time() | |
log.debug('Seen %s! Time: %s' % (action, t)) | |
self.lastSeen['mouse'] = t | |
return 0 | |
@staticmethod | |
def main(): | |
log.debug('Start! Current time: %s' % time.time()) | |
try: | |
timeout = int(sys.argv[1]) | |
except IndexError: | |
print('Usage: xwait [seconds]') | |
return 8 | |
except ValueError: | |
print('Usage: xwait [seconds]') | |
return 6 | |
with XWait(timeout) as watchdog: | |
result = watchdog.watchdog() | |
watchdog.lastSeen['ctime'] = time.time() | |
log.info('Last seen mouse: %(mouse)d. Last seen keyboard %(keyboard)d. Current time %(ctime)d. Exiting...' % watchdog.lastSeen) | |
return result | |
if __name__ == '__main__': | |
import logging | |
log.setLevel(logging.INFO) | |
sys.exit(XWait.main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment