Skip to content

Instantly share code, notes, and snippets.

@cheery
Created October 16, 2012 18:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cheery/3901170 to your computer and use it in GitHub Desktop.
Save cheery/3901170 to your computer and use it in GitHub Desktop.
import struct
fmt = "llHHi"
fmt_len = struct.calcsize(fmt)
class EvDev(object):
on_event = lambda self, timestamp, type, code, value: None
def __init__(self, devname, fd, id_serial, id_path_tag):
self.devname = devname
self.fd = fd
self.id_serial = id_serial
self.id_path_tag = id_path_tag
self.closed = False
def fileno(self):
return self.fd.fileno()
def read(self):
try:
data = self.fd.read(fmt_len)
sec, usec, type, code, value = struct.unpack(fmt, data)
timestamp = sec + float(usec) / 1000000
self.on_event(timestamp, type, code, value)
except IOError, e:
if e.errno == 19: # no such device
self.closed = True
else:
raise e
def close(self):
if self.closed == False:
try:
self.fd.close()
except IOError, e:
if e.errno != 19:
raise e
self.closed = True
def __repr__(self):
return "EvDev(%r, fileno(%i), %r, %r)" % (self.devname, self.fileno(), self.id_serial, self.id_path_tag)
import pyudev, io, os
from evdev import EvDev
def default_collector(device):
if not os.path.basename(device.device_path).startswith('event'):
return False
if device.get('ID_INPUT_KEYBOARD') == '1':
return True
if device.get('ID_INPUT_MOUSE') == '1':
return True
if device.get('ID_INPUT_JOYSTICK') == '1':
return True
def open_evdev(dev):
devname = dev.get('DEVNAME')
id_serial = dev.get('ID_SERIAL')
id_path_tag = dev.get('ID_PATH_TAG')
return EvDev(devname, io.FileIO(devname, 'r'), id_serial, id_path_tag)
class Devices(object):
def __init__(self, on_remove=id, on_add=id, collector=default_collector):
self.udev = pyudev.Context()
self.udev_mon = pyudev.Monitor.from_netlink(self.udev)
self.udev_mon.filter_by(subsystem='input')
self.devices = {}
self.on_remove = on_remove
self.on_add = on_add
self.collector = collector
def init(self):
for dev in self.udev.list_devices(subsystem='input'):
if self.collector(dev):
evdev = open_evdev(dev)
self.devices[evdev.devname] = evdev
self.on_add(evdev)
self.udev_mon.start()
def fileno(self):
return self.udev_mon.fileno()
def __iter__(self):
for device in self.devices.values():
if not device.closed:
yield device
def read(self):
act, dev = self.udev_mon.receive_device()
if self.collector(dev):
if act == 'add':
evdev = open_evdev(dev)
self.devices[evdev.devname] = evdev
self.on_add(evdev)
elif act == 'remove':
devname = dev.get('DEVNAME')
evdev = self.devices.pop(devname, None)
if evdev != None:
evdev.close()
self.on_remove(evdev)
else:
raise Exception("%(act)6s %(ID_SERIAL)20s @ %(ID_PATH_TAG)30s -> %(DEVNAME)s" % dict(device.items(), act=act))
import humani
import select
import time
received = 0
def on_add_device(device):
print '+', device
def on_event(timestamp, type, code, value):
global received
received += 1
#print "%r[%r] -> %r(%r,%r)" % (device, timestamp, type, code, value)
device.on_event = on_event
def on_remove_device(device):
print '-', device
devices = humani.Devices(
on_add=on_add_device,
on_remove=on_remove_device,
)
last_report = time.time()
devices.init()
while True:
now = time.time()
rfds = [devices] + list(devices)
r, w, e = select.select(rfds, (), (), 2.0 - now + last_report)
for fd in r:
fd.read()
now = time.time()
if now - last_report > 2.0:
print "%i events received" % received
received = 0
last_report = now
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment