Skip to content

Instantly share code, notes, and snippets.

@piannucci
Last active May 3, 2017 14:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save piannucci/7b045eadf5fa7479026d2283a68d1610 to your computer and use it in GitHub Desktop.
Save piannucci/7b045eadf5fa7479026d2283a68d1610 to your computer and use it in GitHub Desktop.
enum checker_thread_msg {
checker_thread_off,
checker_thread_on,
checker_thread_quit
}
class Loop {
...
uv_loop_t *uvloop;
int pipe_fd[2];
uv_thread_t checker;
CFRunLoopRef runLoop;
...
};
static uv_once_t loop_key_once = UV_ONCE_INIT;
static uv_key_t loop_key;
static void loop_key_init() {
uv_key_create(&loop_key);
}
static uv_key_t *get_loop_key() {
uv_once(&loop_key_once, loop_key_init);
return &loop_key;
}
static int kevent_hook(int kq, const void *changelist, int nchanges, void *eventlist, int nevents, const struct timespec *timeout) {
if (!eventlist)
return kevent(kq, changelist, nchanges, eventlist, nevents, timeout);
struct timespec zerotimeout = {0,0};
// Going for a poll. A bit less optimial but we break it into two system calls to make sure
// that the kqueue state is up to date. We might as well also peek since we basically get it
// for free w/ the same call.
int res = kevent(kq, changelist, nchanges, eventlist, nevents, &zerotimeout);
if (res)
return res;
// NOTE(deanm): We only ever make a single pass, because we need to make sure that any user code
// (which could update timers, etc) is reflected and we have a proper timeout value. Since user
// code can run in response to CFRunLoop call-outs, we wind down and go back through the uv loop
// again to make sure to update everything.
Loop *loop = (Loop *)uv_key_get(get_loop_key());
// Have the helper thread start select()ing on the kqueue.
write(loop->pipe_fd[1], (const char[]){checker_thread_on}, 1);
CFRunLoopRunInMode(kCFRunLoopDefaultMode, timeout ? (timeout.tv_sec + timeout.tv_nsec / 1e9) : 9999, 1);
// Stop the helper thread if it hasn't already woken up (in which case it would have already
// stopped itself).
write(loop->pipe_fd[1], (const char[]){checker_thread_off}, 1);
// Do the actual kqueue call now (ignore the timeout, don't block).
res = kevent(kq, NULL, 0, eventlist, nevents, &zerotimeout);
// libuv makes an assert that if it calls kevent without a timeout, it should never return 0.
// One approach is to always have a timer somewhere in libuv, so that the timeout will never be
// indefinite. Hopefully simpler here is just to pretend that the kevent was interrupted.
// Haven't checked how this case is handled in libuv, but seems okay.
if (!timeout && res == 0)
{
errno = EINTR;
return -1;
}
return res;
}
static void checker_thread(Loop *loop) {
int kqueue_fd = uv_backend_fd(loop->uvloop);
int check_kqueue = 0;
char msg;
while (1)
{
int nfds = loop->pipe_fd[0] + 1;
fd_set fds;
FD_ZERO(&fds);
FD_SET(loop->pipe_fd[0], &fds);
if (check_kqueue)
{
FD_SET(kqueue_fd, &fds);
if (kqueue_fd + 1 > nfds)
nfds = kqueue_fd + 1;
}
if (0 >= select(nfds, &fds, NULL, NULL, NULL))
abort();
if (FD_ISSET(kqueue_fd, &fds))
{
CFRunLoopStop(loop->runLoop);
check_kqueue = 0;
}
if (FD_ISSET(loop->pipe_fd[0], &fds))
{
char msg;
if (1 != read(loop->pipe_fd[0], &msg, 1))
abort();
if (msg == checker_thread_quit)
break;
else
check_kqueue = (msg == checker_thread_on);
}
}
}
Loop::Loop() {
...
if (0 != pipe(pipe_fd))
abort();
uvloop = uv_default_loop();
uvloop->keventfunc = (void*)&kevent_hook;
runLoop = CFRetain(CFRunLoopGetCurrent());
uv_thread_create(&checker, (uv_thread_cb)checker_thread, (void *)this);
uv_key_set(get_loop_key(), this);
...
}
Loop::~Loop() {
...
uv_key_set(get_loop_key(), NULL);
write(pipe_fd[1], (const char[]){checker_thread_quit}, 1);
uv_thread_join(&checker);
close(pipe_fd[0]);
close(pipe_fd[1]);
CFRelease(runLoop);
...
}
@piannucci
Copy link
Author

piannucci commented May 3, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment