Created
January 21, 2010 19:25
-
-
Save mariusae/283103 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Allow IO-synchronous (blocking) code run ``asynchronously'' in the | |
tornado IO loop. We accomplish this by using threads & a pipe to | |
notify the event loop when the synchronous call has finished. This | |
works because threads yield when waiting for IO. | |
``AsyncThreadPool'' also passes eventlogs around, so that they are | |
always set in the right thread context.""" | |
from __future__ import absolute_import | |
from __future__ import with_statement | |
import os | |
import fcntl | |
import sys | |
import thread | |
import time | |
import copy | |
import logging | |
from threading import Thread, Lock, Event, local | |
from Queue import Queue, Empty | |
import tornado.web | |
from django.db import transaction, connection | |
from tornado import ioloop | |
from util import eventlog | |
from util.db import nestable_commit_on_success | |
class AsyncThreadPool(object): | |
"""Provide a pool of N threads that executes tasks in worker | |
threads and then issues a callback with the results. This can aid | |
in adapting synchronous code to asynchronous interfaces. | |
If ``db_managed'' is set, we wrap each pool call in a (django) | |
database transaction.""" | |
MONITOR_INTERVAL_SECONDS = 15 | |
MONITOR_STUCK_THREADS_MULTIPLIER = 0.5 | |
MONITOR_TRIGGER_STUCK_THREAD_SECONDS = 30 | |
def __init__(self, nthreads=1, io_loop=None, db_managed=False, debug=False): | |
self._io_loop = io_loop or ioloop.IOLoop.instance() | |
self.db_managed = db_managed | |
self.debug = debug | |
rfd, wfd = os.pipe() | |
for fd in [rfd, wfd]: | |
flags = fcntl.fcntl(fd, fcntl.F_GETFL) | |
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) | |
self._reader = os.fdopen(rfd, 'r', 0) | |
self._writer = os.fdopen(wfd, 'w', 0) | |
self._io_loop.add_handler( | |
rfd, self._handle_ioloop_event, self._io_loop.READ) | |
self._queue = Queue() | |
self._done_queue = Queue() | |
self._threads = [] | |
# We need a local variable to store whether the current thread | |
# is a pool worker or not. | |
self._local = local() | |
self._local.in_pool = False | |
# For each thread, we store what it's working on, for | |
# debugging, monitoring, etc. | |
self._running = {} | |
self._lock = Lock() | |
self._join_event = Event() | |
for i in range(nthreads): | |
t = Thread(target=self._loop) | |
t.setDaemon(True) | |
t.start() | |
self._threads.append(t) | |
# Start the monitoring thread. | |
t = Thread(target=self._monitor) | |
t.setDaemon(True) | |
t.start() | |
def _handle_ioloop_event(self, fd, events): | |
self._reader.read() # dequeue. | |
self._flush_callback_queue() | |
def _loop(self): | |
self._local.in_pool = True | |
while True: | |
item = self._queue.get() | |
if item is None: | |
self._queue.task_done() | |
return | |
callback, fun, args, kwargs, elog = item | |
if self.db_managed: | |
connection._commit() | |
fun = nestable_commit_on_success(fun) | |
with self._lock: | |
self._running[thread.get_ident()] = \ | |
(time.time(), fun, args, kwargs) | |
with elog: | |
try: | |
result = (None, fun(*args, **kwargs), None) | |
except Exception, e: | |
result = (e, None, sys.exc_info()[2]) | |
with self._lock: | |
del self._running[thread.get_ident()] | |
if callback is not None: | |
# Put the results on queue back to the main thread & | |
# notify the eventloop via writing to our pipe. | |
self._done_queue.put((callback, result, elog)) | |
self._writer.write('.') | |
self._queue.task_done() | |
def _flush_callback_queue(self): | |
while True: | |
try: | |
callback, result, elog = self._done_queue.get(False) | |
with elog: | |
callback(*result) | |
except Empty: | |
return | |
def _monitor(self): | |
while not self._join_event.isSet(): ## python2.6: ".is_set()" | |
self._commit_suicide_if_threads_are_stuck() | |
self._join_event.wait(self.MONITOR_INTERVAL_SECONDS) | |
def _commit_suicide_if_threads_are_stuck(self): | |
with self._lock: | |
num_max_stuck_threads = \ | |
len(self._threads) * self.MONITOR_STUCK_THREADS_MULTIPLIER | |
now = time.time() | |
elapseds = [now - t for t, _, _, _ in self._running.values()] | |
elapseds = filter( | |
lambda d: d > self.MONITOR_TRIGGER_STUCK_THREAD_SECONDS, | |
elapseds) | |
logging.debug('monitor: thread elapseds = %r' % elapseds) | |
nelapsed = len(elapseds) | |
if nelapsed: | |
logging.info('monitor: %d threads are stuck' % nelapsed) | |
if nelapsed >= num_max_stuck_threads: | |
logging.info('monitor: too many threads are stuck, ' | |
'committing suicide') | |
# hard exit. | |
os._exit(1) | |
def join(self): | |
"""Join the worker threads & queue, ensuring that all tasks | |
are completed. The object is effectively dead after this call.""" | |
self._queue.join() | |
self._join_event.set() | |
# Then kill of our worker threads. | |
for _ in self._threads: | |
self._queue.put(None) | |
while self._threads: | |
self._threads.pop().join() | |
self._io_loop.remove_handler(self._reader.fileno()) | |
# Issue remaining callbacks. | |
self._flush_callback_queue() | |
self._reader.close() | |
self._writer.close() | |
def __call__(self, callback, fun, *args, **kwargs): | |
self._queue.put((callback, fun, args, kwargs, eventlog.get_active())) | |
def assert_in_pool(self): | |
"""Assert that we're in this async pool.""" | |
return getattr(self._local, 'in_pool', False) | |
@property | |
def status_handler(self): | |
POOL = self | |
class Handler(tornado.web.RequestHandler): | |
def get(self): | |
now = time.time() | |
with POOL._lock: | |
running = copy.copy(POOL._running.items()) | |
s = '# ident'.ljust(20) + 'invocation' + '\n' | |
for ident, (begintime, fun, args, kwargs) in running: | |
s += str(ident).ljust(20) | |
args = map(repr, args) + \ | |
['%s=%r' % (k, v) for k, v in kwargs.items()] | |
s += '%s(%s)' % (fun.__name__, ', '.join(args)) | |
s += ' [for %fs]' % (now - begintime) | |
s += '\n' | |
self.set_header('Content-Type', 'text/plain') | |
self.finish(s) | |
return Handler |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment