Skip to content

Instantly share code, notes, and snippets.

@dlech
Created June 25, 2019 23:16
Show Gist options
  • Save dlech/71344ceb5aa66862ad8932064bfe1a22 to your computer and use it in GitHub Desktop.
Save dlech/71344ceb5aa66862ad8932064bfe1a22 to your computer and use it in GitHub Desktop.
MicroPython bindings for libudev
# SPDX-License-Identifier: MIT
# Copyright (c) 2019 David Lechner <david@pybricks.com>
import ffi
from uerrno import ENOENT
_udev = ffi.open('libudev.so.1')
_libc = ffi.open('libc.so.6')
_errno = _libc.var("i", "errno")
class Udev():
_new = _udev.func('p', 'udev_new', '')
_unref = _udev.func('p', 'udev_unref', 'p')
def __init__(self):
self._handle = Udev._new()
if not self._handle:
raise OSError(_errno.get())
# must be called manually! Use context manager!
def __del__(self):
if self._handle:
self._handle = Udev._unref(self._handle)
def __enter__(self):
return self
def __exit__(self, *args):
self.__del__()
def scan(self, subsystem, **kwargs):
"""Scans for matching devices.
Parameters
----------
subsystem : string
The subsystem (name of class or bus) to scan.
**kwargs
Property name/value pairs to match.
Yields
------
Device
The matching devices.
"""
with Enumerate(self) as e:
e.add_match_subsystem(subsystem)
for k, v in kwargs.items():
e.add_match_property(k, v)
try:
e.scan_devices()
except OSError as err:
# not sure why we get ENOENT even on success
if err.args[0] != ENOENT:
raise err
# NB: don't capture e in closure since it is freed by context
# manager when we return from this method
paths = list(e.list())
def iter():
for p in paths:
yield Device.from_syspath(self, p)
return iter()
class Enumerate():
_new = _udev.func('p', 'udev_enumerate_new', 'p')
_unref = _udev.func('p', 'udev_enumerate_unref', 'p')
_add_match_subsystem = _udev.func('p', 'udev_enumerate_add_match_subsystem', 'ps')
_add_match_property = _udev.func('i', 'udev_enumerate_add_match_property', 'pss')
_scan_devices = _udev.func('i', 'udev_enumerate_scan_devices', 'p')
_get_list_entry = _udev.func('p', 'udev_enumerate_get_list_entry', 'p')
def __init__(self, udev):
self._handle = Enumerate._new(udev._handle)
if not self._handle:
raise OSError(_errno.get())
# must be called manually! Use context manager!
def __del__(self):
if self._handle:
self._handle = Enumerate._unref(self._handle)
def __enter__(self):
return self
def __exit__(self, *args):
self.__del__()
def add_match_subsystem(self, subsystem):
err = Enumerate._add_match_subsystem(self._handle, subsystem)
if err < 0:
raise OSError(-err)
def add_match_property(self, name, value):
err = Enumerate._add_match_property(self._handle, name, value)
if err < 0:
raise OSError(-err)
def scan_devices(self):
err = Enumerate._scan_devices(self._handle)
if err < 0:
raise OSError(-err)
def list(self):
handle = Enumerate._get_list_entry(self._handle)
if not handle:
raise OSError(_errno.get())
def iter(entry):
while entry:
yield entry.name
entry = entry.next
return iter(ListEntry(handle))
class Device():
_new_from_syspath = _udev.func('p', 'udev_device_new_from_syspath', 'ps')
_unref = _udev.func('p', 'udev_device_unref', 'p')
_get_devnode = _udev.func('s', 'udev_device_get_devnode', 'p')
@staticmethod
def from_syspath(udev, syspath):
handle = Device._new_from_syspath(udev._handle, syspath)
if not handle:
raise OSError(_errno.get())
return Device(handle)
def __init__(self, handle):
self._handle = handle
# must be called manually! Use context manager!
def __del__(self):
if self._handle:
self._handle = Device._unref(self._handle)
def __enter__(self):
return self
def __exit__(self, *args):
self.__del__()
@property
def devnode(self):
return Device._get_devnode(self._handle)
class Monitor():
_new_from_netlink = _udev.func('p', 'udev_monitor_new_from_netlink', 'ps')
_unref = _udev.func('p', 'udev_monitor_unref', 'p')
def __init__(self, udev):
self._handle = Monitor._new_from_netlink(udev._handle, 'udev')
if not self._handle:
raise OSError(_errno.get())
# must be called manually! Use context manager!
def __del__(self):
if self._handle:
self._handle = Monitor._unref(self._handle)
def __enter__(self):
return self
def __exit__(self, *args):
self.__del__()
class ListEntry():
_get_next = _udev.func('p', 'udev_list_entry_get_next', 'p')
_get_name = _udev.func('s', 'udev_list_entry_get_name', 'p')
_get_value = _udev.func('s', 'udev_list_entry_get_value', 'p')
def __init__(self, handle):
self._handle = handle
@property
def next(self):
handle = ListEntry._get_next(self._handle)
return ListEntry(handle) if handle else None
@property
def name(self):
return ListEntry._get_name(self._handle)
@property
def value(self):
return ListEntry._get_value(self._handle)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment