Skip to content

Instantly share code, notes, and snippets.

@markizano
Created November 2, 2021 17:32
Show Gist options
  • Save markizano/1d1bfbaf65ce30dcba84b0d42a6575c2 to your computer and use it in GitHub Desktop.
Save markizano/1d1bfbaf65ce30dcba84b0d42a6575c2 to your computer and use it in GitHub Desktop.
xwait.py: Wait X number of seconds before returning control of the command after a period of inactivity via the mouse and keyboard.
#!/usr/bin/env python3
import os, sys
import time
import json
import threading
from libinput import LibInput, constant
from glob import glob
from kizano import getLogger
from pprint import pprint
log = getLogger(__name__)
class XWait(object):
'''
Watchdog daemon class that will monitor for keyboard and mouse input (KBM).
Each KBM input will result in the timer being reset to 0.
If the timer exceeds `--wait' seconds, then release control back to the terminal.
Usage:
xwait 600 && run-my-program.sh
Block and wait for ~600 seconds of inactivity from the mouse and keyboard before returning the terminal.
'''
def __init__(self, nsec=300, *args, **kwargs):
'''
Open connections to Xserver to start monitoring events.
@param nsec (int). Number of seconds to wait before releasing control again.
'''
super().__init__(*args, **kwargs)
log.debug('new XWait()')
# Initiate libinput for reading devices from the system bus.
self.xinput = LibInput()
self._initDevices()
self.timeout = nsec
self.lastSeen = {
'mouse': time.time(),
'keyboard': time.time(),
}
def __enter__(self, *args, **kwargs):
'''
For use of `with' operator.
'''
return self
def __exit__(self, *args, **kwargs):
'''
Close device connections for a clean exit.
'''
log.debug('XWait.__exit__()')
for device in self.devices:
self.xinput.path_remove_device(device)
def _initDevices(self):
'''
Allows us to collect devices from `/dev/input/*' to monitor for events.
'''
log.debug('XWait._initDevices()')
def isnotdir(f):
return not os.path.isdir(f)
# Get the list of input devices that not directories.
inDevices = list( filter(isnotdir, glob('/dev/input/*') ) )
# Only include the devices that have the capabilities we want.
self.devices = []
for inDevice in inDevices:
device = self.xinput.path_add_device(inDevice)
if not device:
# If this is not a valid device recognized by lininput, don't amend to the resulting array.
log.info('Device %s empty. Skipping...' % inDevice)
continue
# Filter through some inputs we only want to monitor.
if not device.has_capability(constant.DeviceCapability.POINTER) and not device.has_capability(constant.DeviceCapability.KEYBOARD):
log.info('Device %s not a pointer or keyboard. Skipping...' % device.get_sysname() )
self.xinput.path_remove_device(device)
continue
if 'video' in device.get_name().lower():
log.info('Device %s is a video device. Skipping...' % device.get_sysname() )
self.xinput.path_remove_device(device)
continue
self.devices.append(device)
log.debug('Handling %d devices.' % len(self.devices))
def lastSeenKeyboard(self):
'''
Gets the last time we saw the keyboard touched.
'''
return self.lastSeen['keyboard']
def lastSeenMouse(self):
'''
Gets the last time we saw the mouse moved.
'''
return self.lastSeen['mouse']
def get_event(self):
'''
Override to self.xinput.get_event().
Wraps the result in a try/catch block to return None instead.
'''
log.debug('XWait.getEvent(%d)' % self.timeout)
events = self.xinput.get_event(self.timeout)
try:
for event in events:
yield event
except StopIteration:
return None
finally:
return None
def watchdog(self):
'''
Watches the mouse for events. Updates the last seen timestamp of the mouse having some activity.
'''
log.debug('XWait.watchdog()')
log.info('Ready and watching the mouse/keyboard! Current time: %d' % time.time())
actions = {
constant.Event.POINTER_MOTION: 'mouse moved!',
constant.Event.POINTER_BUTTON: 'mouse clicked!',
constant.Event.POINTER_AXIS: 'mouse scrolled!',
constant.Event.KEYBOARD_KEY: 'keyboard type!',
}
for event in self.get_event():
if event.type in set(actions.keys()):
action = actions.get(event.type)
t = time.time()
log.debug('Seen %s! Time: %s' % (action, t))
self.lastSeen['mouse'] = t
return 0
@staticmethod
def main():
log.debug('Start! Current time: %s' % time.time())
try:
timeout = int(sys.argv[1])
except IndexError:
print('Usage: xwait [seconds]')
return 8
except ValueError:
print('Usage: xwait [seconds]')
return 6
with XWait(timeout) as watchdog:
result = watchdog.watchdog()
watchdog.lastSeen['ctime'] = time.time()
log.info('Last seen mouse: %(mouse)d. Last seen keyboard %(keyboard)d. Current time %(ctime)d. Exiting...' % watchdog.lastSeen)
return result
if __name__ == '__main__':
import logging
log.setLevel(logging.INFO)
sys.exit(XWait.main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment