Skip to content

Instantly share code, notes, and snippets.

@zed
Created August 25, 2012 22:42
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zed/3471699 to your computer and use it in GitHub Desktop.
Save zed/3471699 to your computer and use it in GitHub Desktop.
call Python callback from a non-python thread in a C extension module
/c_extension_module*.so
/build/*
/.do_built*
/log
/all.did
# test using 2.4-3.3 python
for XY in 2.4 2.5 2.6 2.7 3.0 3.1 3.2 3.3; do
PYTHON=python$XY redo test
done
#define Py_LIMITED_API
#define PY_SSIZE_T_CLEAN
// should be before any standard headers
#include "Python.h"
#include <omp.h>
#ifdef __cplusplus
extern "C" {
#endif
static void*
non_python_thread(void *python_callback) {
int error_flag = 0;
int tid = omp_get_thread_num();
PyObject* tmp;
#ifdef WITH_THREAD
PyGILState_STATE state = PyGILState_Ensure();
#else // serialize access if python compiled without threads
#pragma omp critical
{
#endif
// call python callback
tmp = PyObject_CallFunction((PyObject*)python_callback, "i", tid);
if (tmp == NULL) {
PyErr_PrintEx(0);
error_flag = 1; // error occured
}
Py_XDECREF(tmp);
#ifdef WITH_THREAD
PyGILState_Release(state);
#else
} // end omp critical
#endif
return error_flag ? NULL : python_callback;
}
static PyObject *
spawn_non_python_thread(PyObject * self_unused, PyObject *args) {
PyObject* ret;
PyObject* python_callback;
int i, N, error_flag = 0;
/* it should be safe to call PyEval_InitThreads(): either current
function holds GIL or it is started from the main thread (and
there is no GIL yet) */
#ifdef WITH_THREAD
PyEval_InitThreads();
#endif
if (!PyArg_ParseTuple(args, "Oi:spawn_non_python_thread",
&python_callback, &N))
return NULL; // propagate exception
Py_INCREF(python_callback); // hold on to it until threads are finished
Py_BEGIN_ALLOW_THREADS
omp_set_num_threads(N);
#pragma omp parallel for
for (i = 0; i < N; ++i)
if (!non_python_thread(python_callback))
error_flag = 1;
Py_END_ALLOW_THREADS
// check whether non_python_thread failed
if (!error_flag) { // no error
ret = Py_None; Py_INCREF(ret);
}
else {
ret = NULL;
PyErr_SetString(PyExc_RuntimeError, "callback failed");
}
Py_DECREF(python_callback);
return ret;
}
static PyMethodDef
module_functions[] = {
{ "spawn_non_python_thread", spawn_non_python_thread,
METH_VARARGS, "func docstring" },
{ NULL, NULL, 0, NULL }
};
// http://python3porting.com/cextensions.html
#if PY_MAJOR_VERSION >= 3
#define MOD_ERROR_VAL NULL
#define MOD_SUCCESS_VAL(val) val
#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
#define MOD_DEF(ob, name, doc, methods) \
static struct PyModuleDef moduledef = { \
PyModuleDef_HEAD_INIT, name, doc, -1, methods, }; \
ob = PyModule_Create(&moduledef);
#else
#define MOD_ERROR_VAL
#define MOD_SUCCESS_VAL(val)
#define MOD_INIT(name) void init##name(void)
#define MOD_DEF(ob, name, doc, methods) \
ob = Py_InitModule3(name, methods, doc);
#endif
MOD_INIT(c_extension_module)
{
PyObject *m = NULL;
MOD_DEF(m, "c_extension_module", "module docstring", module_functions)
if (m == NULL)
return MOD_ERROR_VAL;
return MOD_SUCCESS_VAL(m);
}
#ifdef __cplusplus
}
#endif
# remove non-git files
git clean -x -f -d
#!/bin/sh
#
# A minimal alternative to djb redo that doesn't support incremental builds.
# For the full version, visit http://github.com/apenwarr/redo
#
# The author disclaims copyright to this source file and hereby places it in
# the public domain. (2010 12 14)
#
# By default, no output coloring.
green=""
bold=""
plain=""
if [ -n "$TERM" -a "$TERM" != "dumb" ] && tty <&2 >/dev/null 2>&1; then
green="$(printf '\033[32m')"
bold="$(printf '\033[1m')"
plain="$(printf '\033[m')"
fi
_dirsplit()
{
base=${1##*/}
dir=${1%$base}
}
dirname()
(
_dirsplit "$1"
dir=${dir%/}
echo "${dir:-.}"
)
_dirsplit "$0"
export REDO=$(cd "${dir:-.}" && echo "$PWD/$base")
DO_TOP=
if [ -z "$DO_BUILT" ]; then
DO_TOP=1
[ -n "$*" ] || set all # only toplevel redo has a default target
export DO_BUILT=$PWD/.do_built
: >>"$DO_BUILT"
echo "Removing previously built files..." >&2
sort -u "$DO_BUILT" | tee "$DO_BUILT.new" |
while read f; do printf "%s\0%s.did\0" "$f" "$f"; done |
xargs -0 rm -f 2>/dev/null
mv "$DO_BUILT.new" "$DO_BUILT"
DO_PATH=$DO_BUILT.dir
export PATH=$DO_PATH:$PATH
rm -rf "$DO_PATH"
mkdir "$DO_PATH"
for d in redo redo-ifchange; do
ln -s "$REDO" "$DO_PATH/$d";
done
[ -e /bin/true ] && TRUE=/bin/true || TRUE=/usr/bin/true
for d in redo-ifcreate redo-stamp redo-always; do
ln -s $TRUE "$DO_PATH/$d";
done
fi
_find_dofile_pwd()
{
dofile=default.$1.do
while :; do
dofile=default.${dofile#default.*.}
[ -e "$dofile" -o "$dofile" = default.do ] && break
done
ext=${dofile#default}
ext=${ext%.do}
base=${1%$ext}
}
_find_dofile()
{
local prefix=
while :; do
_find_dofile_pwd "$1"
[ -e "$dofile" ] && break
[ "$PWD" = "/" ] && break
target=${PWD##*/}/$target
tmp=${PWD##*/}/$tmp
prefix=${PWD##*/}/$prefix
cd ..
done
base=$prefix$base
}
_run_dofile()
{
export DO_DEPTH="$DO_DEPTH "
export REDO_TARGET=$PWD/$target
local line1
set -e
read line1 <"$PWD/$dofile" || true
cmd=${line1#"#!/"}
if [ "$cmd" != "$line1" ]; then
/$cmd "$PWD/$dofile" "$@" >"$tmp.tmp2"
else
:; . "$PWD/$dofile" >"$tmp.tmp2"
fi
}
_do()
{
local dir=$1 target=$2 tmp=$3
if [ ! -e "$target" ] || [ -d "$target" -a ! -e "$target.did" ]; then
printf '%sdo %s%s%s%s\n' \
"$green" "$DO_DEPTH" "$bold" "$dir$target" "$plain" >&2
echo "$PWD/$target" >>"$DO_BUILT"
dofile=$target.do
base=$target
ext=
[ -e "$target.do" ] || _find_dofile "$target"
if [ ! -e "$dofile" ]; then
echo "do: $target: no .do file" >&2
return 1
fi
[ ! -e "$DO_BUILT" ] || [ ! -d "$(dirname "$target")" ] ||
: >>"$target.did"
( _run_dofile "$target" "$base" "$tmp.tmp" )
rv=$?
if [ $rv != 0 ]; then
printf "do: %s%s\n" "$DO_DEPTH" \
"$dir$target: got exit code $rv" >&2
rm -f "$tmp.tmp" "$tmp.tmp2"
return $rv
fi
mv "$tmp.tmp" "$target" 2>/dev/null ||
! test -s "$tmp.tmp2" ||
mv "$tmp.tmp2" "$target" 2>/dev/null
rm -f "$tmp.tmp2"
else
echo "do $DO_DEPTH$target exists." >&2
fi
}
# Make corrections for directories that don't actually exist yet.
_dir_shovel()
{
local dir base
xdir=$1 xbase=$2 xbasetmp=$2
while [ ! -d "$xdir" -a -n "$xdir" ]; do
_dirsplit "${xdir%/}"
xbasetmp=${base}__$xbase
xdir=$dir xbase=$base/$xbase
echo "xbasetmp='$xbasetmp'" >&2
done
}
_redo()
{
set +e
for i in "$@"; do
_dirsplit "$i"
_dir_shovel "$dir" "$base"
dir=$xdir base=$xbase basetmp=$xbasetmp
( cd "$dir" && _do "$dir" "$base" "$basetmp" )
[ "$?" = 0 ] || return 1
done
}
_redo "$@"
[ "$?" = 0 ] || exit 1
if [ -n "$DO_TOP" ]; then
echo "Removing stamp files..." >&2
[ ! -e "$DO_BUILT" ] ||
while read f; do printf "%s.did\0" "$f"; done <"$DO_BUILT" |
xargs -0 rm -f 2>/dev/null
fi
import c_extension_module
def python_callback(tid):
print("python callback called %d" % tid)
def python_thread():
print("python thread started %d" % tid)
print("python thread ended %d" % tid)
# `threading` might implicitly call PyEval_InitThreads()
try:
import threading
except ImportError:
print("threadless python %d" % tid)
else:
threading.Thread(target=python_thread).start()
# there should be no GIL yet (we have only one main thread)
c_extension_module.spawn_non_python_thread(python_callback, 4)
print("exit main thread")
from distutils.core import setup
from distutils.extension import Extension
setup(
name = 'Test python callback in a non-python thread',
ext_modules = [Extension("c_extension_module",
["c_extension_module.c"],
extra_compile_args=["-fopenmp"],
extra_link_args=["-fopenmp"]
)]
)
# run main.py until expected error encountered or threadless python used
exec >&2
: ${PYTHON:=python}
$PYTHON -V
# build C extension
$PYTHON setup.py build_ext --inplace --force >/dev/null
# run
for count in `seq 0 1000`
do
echo -n .
if $PYTHON main.py 2>&1 >/dev/null | tee log |
grep 'RuntimeError\|KeyError\|threadless\|Exception'
then
break
fi
done
cat log
@liuyu81
Copy link

liuyu81 commented Aug 26, 2012

The KeyError() and deadlock cases can be avoided, check this out https://gist.github.com/3473376

@liuyu81
Copy link

liuyu81 commented Aug 27, 2012

I came up with a simple workaround that can hide dirty tricks from end users (https://gist.github.com/3473376). We can import the threading module in the initializer of c_extension_module, i.e. PyImport_ImportModule("threading"). This can enforce Python's MainThread to be correctly logged by the threading module thus avoiding the KeyError issue. On OpenMP threads, my guess is that omp threads may have some special synchronization activities when initialized, and these additional activities forced Python's MainThread to schedule so that it is always captured by the standard threading module.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment