Skip to content

Instantly share code, notes, and snippets.

@serialx
Created November 15, 2011 03:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save serialx/1366053 to your computer and use it in GitHub Desktop.
Save serialx/1366053 to your computer and use it in GitHub Desktop.
gevent getthreading
import gevent, gevent.event
import threading, Queue, collections, time, functools
def _threads_poller_f():
while _OsThread._threads_count:
try:
t, rv, isexc = _OsThread._threads_results.get_nowait()
except Queue.Empty:
gevent.sleep()
else:
if isexc:
t._async_result.set_exception(rv)
else:
t._async_result.set(rv)
class _OsThread(threading.Thread):
_threads_count = 0
_threads_sleep_interval = 0.008
_threads_timeout = 3
_threads_poller = None
_threads_results = Queue.Queue()
_free_threads = collections.deque()
_free_threads_lock = threading.RLock()
def __init__(self):
"Do not instantiate _OsThread objects directly. Use acquire instead."
threading.Thread.__init__(self)
self.setDaemon(True)
self.stopped = False
self.inq = Queue.Queue()
def _init(self):
self.free = False
_OsThread._threads_count += 1
if _OsThread._threads_count == 1:
_OsThread._threads_poller = gevent.spawn_raw(_threads_poller_f)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.release()
@classmethod
def acquire(cls):
with _OsThread._free_threads_lock:
try:
self = _OsThread._free_threads.popleft()
self._init()
except IndexError:
self = cls()
self._init()
self.start()
return self
def release(self):
with _OsThread._free_threads_lock:
if not self.free and not self.stopped:
_OsThread._threads_count -= 1
self.free = True
_OsThread._free_threads.append(self)
return self
def stop(self):
with _OsThread._free_threads_lock:
self.stopped = True
if self.free:
self.free = False
_OsThread._free_threads.remove(self)
else:
_OsThread._threads_count -= 1
self.inq.put(None)
return self
# Overridden
def run(self):
while True:
time.sleep(_OsThread._threads_sleep_interval)
try:
obj = self.inq.get(timeout=_OsThread._threads_timeout)
except Queue.Empty:
with _OsThread._free_threads_lock:
if self.free:
self.free = False
_OsThread._free_threads.remove(self)
break
else:
continue
if obj is None:
break
else:
rv = None
isexc = False
try:
rv = obj.func(*obj.args, **obj.kwargs)
except Exception as e:
rv = e
isexc = True
finally:
_OsThread._threads_results.put((obj, rv, isexc))
osthread_acquire = _OsThread.acquire
class inthread(object):
def __new__(cls, f, *args, **kwargs):
self = object.__new__(cls)
if isinstance(f, _OsThread):
self.func = args[0]
self.args = args[1:]
self.kwargs = kwargs
self._async_result = gevent.event.AsyncResult()
self.thread = f
self.thread.inq.put(self)
return self._async_result.get()
else:
self.func = f
self.args = args
self.kwargs = kwargs
self._async_result = gevent.event.AsyncResult()
self.thread = _OsThread.acquire()
self.thread.inq.put(self)
try:
return self._async_result.get()
finally:
self.thread.release()
@classmethod
def wrap(cls, func):
@functools.wraps(func)
def wrapped(*args, **kwargs):
return cls(func, *args, **kwargs)
return wrapped
import gethreading
import gevent
import time, random, threading
def get_running_threads():
return set(th for th in threading.enumerate() if th.name not in ('MainThread', 'SockThread'))
def wait(secs):
time.sleep(secs)
return gethreading.threading.currentThread()
def f(x):
print "f({}) start".format(x)
t1 = gethreading.inthread(wait, random.random())
print "f({}) middle".format(x)
t2 = gethreading.inthread(wait, random.random())
## with osthread_acquire() as t:
## t1 = inthread(t, wait, random.random())
## t2 = inthread(t, wait, random.random())
print "f({}) end [t1={}, t2={}]".format(x, t1.name, t2.name)
jobs = [gevent.spawn(f, x) for x in 'abcde']
gevent.joinall(jobs)
##print "---"
##
##_rt = set()
##while True:
## rt = get_running_threads()
## if not rt:
## break
## if rt != _rt:
## print "Running threads: {}".format(', '.join(t.name for t in rt))
## _rt = rt
## time.sleep(0.008)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment