Skip to content

Instantly share code, notes, and snippets.

@jizhilong
Last active February 20, 2024 05:32
Show Gist options
  • Save jizhilong/1b158b9b2446d01b1c77510056e98f73 to your computer and use it in GitHub Desktop.
Save jizhilong/1b158b9b2446d01b1c77510056e98f73 to your computer and use it in GitHub Desktop.
timerfd_create/timerfd_settime/timerfd_gettime implementations based on ctypes and libc
"""
timerfd_xx functions and demos for python versions prior to 3.13, implemented with ctypes and libc.
"""
try:
from os import timerfd_create, timerfd_settime, timerfd_gettime, timerfd_settime_ns, timerfd_gettime_ns
from os import TFD_NONBLOCK, TFD_CLOEXEC, TFD_TIMER_ABSTIME, TFD_TIMER_CANCEL_ON_SET
except ImportError:
import os
import time
from typing import Optional
from ctypes import CDLL, c_long, Structure, c_int, get_errno, POINTER, byref
from ctypes.util import find_library
class TimeSpec(Structure):
_fields_ = [
("tv_sec", c_long),
("tv_nsec", c_long)
]
class ITimerSpec(Structure):
_fields_ = [
("it_interval", TimeSpec),
("it_value", TimeSpec)
]
ITimerSpecP = POINTER(ITimerSpec)
CLOCK_REALTIME = 0
CLOCK_MONOTONIC = 1
CLOCK_BOOTTIME = 7
CLOCK_REALTIME_ALARM = 8
CLOCK_BOOTTIME_ALARM = 9
TFD_NONBLOCK = 0x800
TFD_CLOEXEC = 0x8000
TFD_TIMER_ABSTIME = 1
TFD_TIMER_CANCEL_ON_SET = 2
libc_name = find_library('c')
if not libc_name:
raise ImportError("cannot find libc")
libc = CDLL(libc_name, use_errno=True)
c_timerfd_create = libc.timerfd_create
c_timerfd_create.argtypes = [c_int, c_int]
c_timerfd_create.restype = c_int
c_timerfd_settime = libc.timerfd_settime
c_timerfd_settime.argtypes = [c_int, c_int, ITimerSpecP, ITimerSpecP]
c_timerfd_settime.restype = c_int
c_timerfd_gettime = libc.timerfd_gettime
c_timerfd_gettime.argtypes = [c_int, ITimerSpecP]
c_timerfd_gettime.restype = c_int
def timerfd_create(clock_id=CLOCK_REALTIME, /, *, flags=0) -> int:
ret = c_timerfd_create(clock_id, flags)
if ret < 0:
err_code = get_errno()
err_str = os.strerror(err_code)
raise OSError(f"timerfd_create failed {err_code} {err_str}")
return ret
def timerfd_settime(fd: int, /, *,
flags: int = 0,
initial: float = 0.0, interval: float = 0.0) -> tuple[float, float]:
new_spec = ITimerSpec()
new_spec.it_value.tv_sec = int(initial)
new_spec.it_value.tv_nsec = int((initial - new_spec.it_value.tv_sec) * 1e9)
new_spec.it_interval.tv_sec = int(interval)
new_spec.it_interval.tv_nsec = int((interval - new_spec.it_interval.tv_sec) * 1e9)
old_spec = ITimerSpec()
ret = c_timerfd_settime(fd, flags, byref(new_spec), byref(old_spec))
if ret < 0:
err_code = get_errno()
err_str = os.strerror(err_code)
raise OSError(f"timerfd_settime failed {err_code} {err_str}")
old_initial = old_spec.it_value.tv_sec + old_spec.it_value.tv_nsec * 1e-9
old_interval = old_spec.it_interval.tv_sec + old_spec.it_interval.tv_nsec * 1e-9
return old_initial, old_interval
def timerfd_settime_ns(fd: int, /, *,
flags: int = 0,
initial: int = 0, interval: int = 0) -> tuple[int, int]:
new_spec = ITimerSpec()
new_spec.it_value.tv_sec = initial // 10**9
new_spec.it_value.tv_nsec = initial % 10**9
new_spec.it_interval.tv_sec = interval // 10**9
new_spec.it_interval.tv_nsec = interval % 10**9
old_spec = ITimerSpec()
ret = c_timerfd_settime(fd, flags, byref(new_spec), byref(old_spec))
if ret < 0:
err_code = get_errno()
err_str = os.strerror(err_code)
raise OSError(f"timerfd_settime failed {err_code} {err_str}")
old_initial = old_spec.it_value.tv_sec * 10**9 + old_spec.it_value.tv_nsec
old_interval = old_spec.it_interval.tv_sec * 10**9 + old_spec.it_interval.tv_nsec
return old_initial, old_interval
def timerfd_gettime(fd) -> tuple[float, float]:
old_spec = ITimerSpec()
ret = c_timerfd_gettime(fd, byref(old_spec))
if ret < 0:
err_code = get_errno()
err_str = os.strerror(err_code)
raise OSError(f"timerfd_gettime failed {err_code} {err_str}")
old_initial = old_spec.it_value.tv_sec + old_spec.it_value.tv_nsec * 1e-9
old_interval = old_spec.it_interval.tv_sec + old_spec.it_interval.tv_nsec * 1e-9
return old_initial, old_interval
def timerfd_gettime_ns(fd) -> tuple[int, int]:
old_spec = ITimerSpec()
ret = c_timerfd_gettime(fd, byref(old_spec))
if ret < 0:
err_code = get_errno()
err_str = os.strerror(err_code)
raise OSError(f"timerfd_gettime failed {err_code} {err_str}")
old_initial = old_spec.it_value.tv_sec * 10**9 + old_spec.it_value.tv_nsec
old_interval = old_spec.it_interval.tv_sec * 10**9 + old_spec.it_interval.tv_nsec
return old_initial, old_interval
if __name__ == '__main__':
import logging
import socket
import selectors
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("timerfd_demo")
timer_fd = timerfd_create()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('0.0.0.0', 51121))
selector = selectors.DefaultSelector()
selector.register(sock, selectors.EVENT_READ)
selector.register(timer_fd, selectors.EVENT_READ)
timerfd_settime(timer_fd, initial=1, interval=5)
while True:
for key, mask in selector.select():
if key.fileobj == sock:
data, addr = sock.recvfrom(1024)
timerfd_settime(timer_fd, initial=5, interval=5)
logger.info("recv %s from %s", data, addr)
sock.sendto(b'hello', addr)
elif key.fileobj == timer_fd:
times = int.from_bytes(os.read(timer_fd, 8), 'little')
logger.info("idle timer triggerd %s more times", times)
else:
logger.info("unknown fd %s", key.fileobj)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment