Skip to content

Instantly share code, notes, and snippets.

@mariusae
Created January 21, 2010 19:25
Show Gist options
  • Save mariusae/283103 to your computer and use it in GitHub Desktop.
Save mariusae/283103 to your computer and use it in GitHub Desktop.
"""Allow IO-synchronous (blocking) code run ``asynchronously'' in the
tornado IO loop. We accomplish this by using threads & a pipe to
notify the event loop when the synchronous call has finished. This
works because threads yield when waiting for IO.
``AsyncThreadPool'' also passes eventlogs around, so that they are
always set in the right thread context."""
from __future__ import absolute_import
from __future__ import with_statement
import os
import fcntl
import sys
import thread
import time
import copy
import logging
from threading import Thread, Lock, Event, local
from Queue import Queue, Empty
import tornado.web
from django.db import transaction, connection
from tornado import ioloop
from util import eventlog
from util.db import nestable_commit_on_success
class AsyncThreadPool(object):
"""Provide a pool of N threads that executes tasks in worker
threads and then issues a callback with the results. This can aid
in adapting synchronous code to asynchronous interfaces.
If ``db_managed'' is set, we wrap each pool call in a (django)
database transaction."""
MONITOR_INTERVAL_SECONDS = 15
MONITOR_STUCK_THREADS_MULTIPLIER = 0.5
MONITOR_TRIGGER_STUCK_THREAD_SECONDS = 30
def __init__(self, nthreads=1, io_loop=None, db_managed=False, debug=False):
self._io_loop = io_loop or ioloop.IOLoop.instance()
self.db_managed = db_managed
self.debug = debug
rfd, wfd = os.pipe()
for fd in [rfd, wfd]:
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self._reader = os.fdopen(rfd, 'r', 0)
self._writer = os.fdopen(wfd, 'w', 0)
self._io_loop.add_handler(
rfd, self._handle_ioloop_event, self._io_loop.READ)
self._queue = Queue()
self._done_queue = Queue()
self._threads = []
# We need a local variable to store whether the current thread
# is a pool worker or not.
self._local = local()
self._local.in_pool = False
# For each thread, we store what it's working on, for
# debugging, monitoring, etc.
self._running = {}
self._lock = Lock()
self._join_event = Event()
for i in range(nthreads):
t = Thread(target=self._loop)
t.setDaemon(True)
t.start()
self._threads.append(t)
# Start the monitoring thread.
t = Thread(target=self._monitor)
t.setDaemon(True)
t.start()
def _handle_ioloop_event(self, fd, events):
self._reader.read() # dequeue.
self._flush_callback_queue()
def _loop(self):
self._local.in_pool = True
while True:
item = self._queue.get()
if item is None:
self._queue.task_done()
return
callback, fun, args, kwargs, elog = item
if self.db_managed:
connection._commit()
fun = nestable_commit_on_success(fun)
with self._lock:
self._running[thread.get_ident()] = \
(time.time(), fun, args, kwargs)
with elog:
try:
result = (None, fun(*args, **kwargs), None)
except Exception, e:
result = (e, None, sys.exc_info()[2])
with self._lock:
del self._running[thread.get_ident()]
if callback is not None:
# Put the results on queue back to the main thread &
# notify the eventloop via writing to our pipe.
self._done_queue.put((callback, result, elog))
self._writer.write('.')
self._queue.task_done()
def _flush_callback_queue(self):
while True:
try:
callback, result, elog = self._done_queue.get(False)
with elog:
callback(*result)
except Empty:
return
def _monitor(self):
while not self._join_event.isSet(): ## python2.6: ".is_set()"
self._commit_suicide_if_threads_are_stuck()
self._join_event.wait(self.MONITOR_INTERVAL_SECONDS)
def _commit_suicide_if_threads_are_stuck(self):
with self._lock:
num_max_stuck_threads = \
len(self._threads) * self.MONITOR_STUCK_THREADS_MULTIPLIER
now = time.time()
elapseds = [now - t for t, _, _, _ in self._running.values()]
elapseds = filter(
lambda d: d > self.MONITOR_TRIGGER_STUCK_THREAD_SECONDS,
elapseds)
logging.debug('monitor: thread elapseds = %r' % elapseds)
nelapsed = len(elapseds)
if nelapsed:
logging.info('monitor: %d threads are stuck' % nelapsed)
if nelapsed >= num_max_stuck_threads:
logging.info('monitor: too many threads are stuck, '
'committing suicide')
# hard exit.
os._exit(1)
def join(self):
"""Join the worker threads & queue, ensuring that all tasks
are completed. The object is effectively dead after this call."""
self._queue.join()
self._join_event.set()
# Then kill of our worker threads.
for _ in self._threads:
self._queue.put(None)
while self._threads:
self._threads.pop().join()
self._io_loop.remove_handler(self._reader.fileno())
# Issue remaining callbacks.
self._flush_callback_queue()
self._reader.close()
self._writer.close()
def __call__(self, callback, fun, *args, **kwargs):
self._queue.put((callback, fun, args, kwargs, eventlog.get_active()))
def assert_in_pool(self):
"""Assert that we're in this async pool."""
return getattr(self._local, 'in_pool', False)
@property
def status_handler(self):
POOL = self
class Handler(tornado.web.RequestHandler):
def get(self):
now = time.time()
with POOL._lock:
running = copy.copy(POOL._running.items())
s = '# ident'.ljust(20) + 'invocation' + '\n'
for ident, (begintime, fun, args, kwargs) in running:
s += str(ident).ljust(20)
args = map(repr, args) + \
['%s=%r' % (k, v) for k, v in kwargs.items()]
s += '%s(%s)' % (fun.__name__, ', '.join(args))
s += ' [for %fs]' % (now - begintime)
s += '\n'
self.set_header('Content-Type', 'text/plain')
self.finish(s)
return Handler
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment