Skip to content

Instantly share code, notes, and snippets.

@tlynn
Last active June 22, 2020 03:02
Show Gist options
  • Save tlynn/5278174 to your computer and use it in GitHub Desktop.
Save tlynn/5278174 to your computer and use it in GitHub Desktop.
Use cffi (cffi.rtfd.org) to wrap the pthread set/get affinity functions.
'''
Wrap the functions to tie threads to particular CPU cores (affinity) on Linux.
'''
import os
import cffi
__all__ = ['pthread_self', 'pthread_getaffinity_np', 'pthread_setaffinity_np']
ffi = cffi.FFI()
# XXX: Internal detail from Linux /usr/include/bits/pthreadtypes.h
ffi.cdef("typedef unsigned long pthread_t;")
ffi.cdef("typedef ... cpu_set_t;")
ffi.cdef(""" // from the man pages
pthread_t pthread_self(void);
int pthread_setaffinity_np(pthread_t thread, size_t cpusetsize,
const cpu_set_t *cpuset);
int pthread_getaffinity_np(pthread_t thread, size_t cpusetsize,
cpu_set_t *cpuset);
""")
# Expose sizeof(cpu_set_t) to enable allocating CPU sets in Python.
ffi.cdef("static const size_t cpusetsize;")
# Functions wrapping the macros for manipulating CPU sets ("man 3 CPU_SET").
ffi.cdef("""
static int CPU_SETSIZE_;
static void CPU_ZERO_(cpu_set_t *set);
static void CPU_SET_(int cpu, cpu_set_t *set);
static void CPU_CLR_(int cpu, cpu_set_t *set);
static int CPU_ISSET_(int cpu, cpu_set_t *set);
""")
C = ffi.verify(""" // from the man pages
#ifndef _GNU_SOURCE
# define _GNU_SOURCE
#endif
#include <pthread.h>
// The exposed implementations.
static const size_t cpusetsize = sizeof(cpu_set_t);
static int CPU_SETSIZE_ = CPU_SETSIZE;
static void CPU_ZERO_(cpu_set_t *set) { CPU_ZERO(set); }
static void CPU_SET_(int cpu, cpu_set_t *set) { CPU_SET(cpu, set); }
static void CPU_CLR_(int cpu, cpu_set_t *set) { CPU_CLR(cpu, set); }
static int CPU_ISSET_(int cpu, cpu_set_t *set) { return CPU_ISSET(cpu, set); }
""")
pthread_self = C.pthread_self # threading.current_thread().ident also works
def pthread_getaffinity_np(thread_ident):
'''Get the list of CPU IDs that the thread with the given ident is
allowed to run on.
'''
threadptr = ffi.cast("pthread_t", thread_ident)
cpuset = ffi.cast('cpu_set_t*', ffi.new('unsigned char[]', C.cpusetsize))
C.CPU_ZERO_(cpuset)
ret = C.pthread_getaffinity_np(threadptr, C.cpusetsize, cpuset)
if ret != 0:
raise OSError(ret, "pthread_getaffinity_np: %s" % os.strerror(ret))
cpus = []
for cpu_id in xrange(C.CPU_SETSIZE_):
if C.CPU_ISSET_(cpu_id, cpuset):
cpus.append(cpu_id)
return cpus
def pthread_setaffinity_np(thread_ident, cpus):
'''Set the thread with the given ident to only be allowed to run on the
specified cpus (a container type).
'''
threadptr = ffi.cast("pthread_t", thread_ident)
cpuset = ffi.cast('cpu_set_t*', ffi.new('unsigned char[]', C.cpusetsize))
C.CPU_ZERO_(cpuset)
for cpu_id in cpus:
C.CPU_SET_(cpu_id, cpuset)
ret = C.pthread_setaffinity_np(threadptr, C.cpusetsize, cpuset)
if ret != 0:
raise OSError(ret, "pthread_setaffinity_np: %s" % os.strerror(ret))
def demo():
'''Run each CPU core in turn at 100% for 5 seconds.'''
# Watch this in a monitoring tool such as htop.
import multiprocessing, time, threading
print 'Before:', pthread_getaffinity_np(pthread_self())
for i in xrange(multiprocessing.cpu_count()):
pthread_setaffinity_np(threading.current_thread().ident, [i])
print 'Running on CPU core %d' % i
deadline = time.time() + 5
while time.time() < deadline:
pass
if __name__ == '__main__':
demo()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment