Skip to content

Instantly share code, notes, and snippets.

@leth
Created October 8, 2014 13:07
Show Gist options
  • Save leth/727c43d538768a51c7f8 to your computer and use it in GitHub Desktop.
Save leth/727c43d538768a51c7f8 to your computer and use it in GitHub Desktop.
import weakref
import threading
from twisted.python.threadpool import ThreadPool
from crochet import EventualResult
_blocked_pool_threads = weakref.WeakKeyDictionary()
_orig_pool_limits = weakref.WeakKeyDictionary()
_originals = {}
def install_patches():
_originals[(EventualResult, '_result')] = EventualResult._result
_originals[(ThreadPool, 'threadFactory')] = ThreadPool.threadFactory
EventualResult._result = make_EventualResult_result(EventualResult._result)
ThreadPool.threadFactory = threadFactory
def uninstall_patches():
for (obj, name), orig in _originals.iteritems():
setattr(obj, name, orig)
for pool, (min, max) in _orig_pool_limits.iteritems():
pool.adjustPoolsize(min, max)
def threadFactory(pool, *args, **kwargs):
return WorkerThread(pool, *args, **kwargs)
def make_EventualResult_result(orig):
def _EventualResult_result(self, *args, **kwargs):
thread = threading.current_thread()
is_worker = isinstance(thread, WorkerThread)
if is_worker:
pool = thread._thread_pool
blocked_threads = _blocked_pool_threads[pool]
blocked_threads.add(thread)
blocked_count = len(blocked_threads)
min, max = _orig_pool_limits[pool]
pool.adjustPoolsize(min + blocked_count, max + blocked_count)
try:
return orig(self, *args, **kwargs)
finally:
if is_worker:
blocked_threads.remove(thread)
return _EventualResult_result
class WorkerThread(threading.Thread):
def __init__(self, pool, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self._thread_pool = pool
_orig_pool_limits.setdefault(pool, (pool.min, pool.max))
_blocked_pool_threads.setdefault(pool, set())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment