-
-
Save api-padawan/5421867 to your computer and use it in GitHub Desktop.
Python 2.6 with a Python 3.2-style semaphore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import weakref | |
import time | |
import thread_util_copy | |
nthreads = 1000 | |
sem = thread_util_copy.BoundedSemaphore(5) | |
local = threading.local() | |
class Vigil(object): | |
pass | |
class MyThread(threading.Thread): | |
def run(self): | |
def on_thread_died(ref): | |
sem.release() | |
local.vigil = Vigil() | |
self.ref = weakref.ref(local.vigil, on_thread_died) | |
sem.acquire() | |
threads = [MyThread() for _ in range(nthreads)] | |
for t in threads: | |
t.start() | |
for t in threads: | |
t.join() | |
for _ in range(5): | |
if sem.counter == 5: | |
print 'success!' | |
break | |
getattr(local, 'c', None) # Trigger cleanup in 2.6 | |
time.sleep(.1) | |
if sem.counter != 5: | |
print 'fail', sem.counter |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
try: | |
from time import monotonic as _time | |
except ImportError: | |
from time import time as _time | |
### Begin backport from CPython 3.2 for timeout support for Semaphore.acquire | |
class Semaphore: | |
# After Tim Peters' semaphore class, but not quite the same (no maximum) | |
def __init__(self, value=1): | |
if value < 0: | |
raise ValueError("semaphore initial value must be >= 0") | |
self._cond = threading.Condition(threading.Lock()) | |
self._value = value | |
def acquire(self, blocking=True, timeout=None): | |
if not blocking and timeout is not None: | |
raise ValueError("can't specify timeout for non-blocking acquire") | |
rc = False | |
endtime = None | |
self._cond.acquire() | |
while self._value == 0: | |
if not blocking: | |
break | |
if timeout is not None: | |
if endtime is None: | |
endtime = _time() + timeout | |
else: | |
timeout = endtime - _time() | |
if timeout <= 0: | |
break | |
self._cond.wait(timeout) | |
else: | |
self._value = self._value - 1 | |
rc = True | |
self._cond.release() | |
return rc | |
__enter__ = acquire | |
def release(self): | |
self._cond.acquire() | |
self._value = self._value + 1 | |
self._cond.notify() | |
self._cond.release() | |
def __exit__(self, t, v, tb): | |
self.release() | |
@property | |
def counter(self): | |
return self._value | |
class BoundedSemaphore(Semaphore): | |
"""Semaphore that checks that # releases is <= # acquires""" | |
def __init__(self, value=1): | |
Semaphore.__init__(self, value) | |
self._initial_value = value | |
def release(self): | |
if self._value >= self._initial_value: | |
raise ValueError("Semaphore released too many times") | |
return Semaphore.release(self) | |
### End backport from CPython 3.2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment