Skip to content

Instantly share code, notes, and snippets.

@ficapy
Created March 12, 2018 08:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ficapy/50846446b1852eba74fdc5f7465ea86c to your computer and use it in GitHub Desktop.
Save ficapy/50846446b1852eba74fdc5f7465ea86c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: ficapy
# Create: '12/03/2018'
from threading import Lock, get_ident
class RLock:
def __init__(self):
self.lock = Lock()
self._owner = None
self._count = 0
def acquire(self):
me = get_ident()
if self._owner == me:
self._count += 1
return
rc = self.lock.acquire()
if rc:
self._owner = me
self._count = 1
__enter__ = acquire
def release(self):
if self._owner != get_ident():
raise RuntimeError()
self._count -= 1
if self._count == 0:
self._owner = None
self.lock.release()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
from collections import deque
class Condition:
def __init__(self):
self._lock = RLock()
self._waiters = deque()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def wait(self):
assert get_ident() == self._lock._owner
waiter = Lock()
self._waiters.append(waiter)
waiter.acquire()
self._lock.release()
waiter.acquire()
self._lock.acquire()
del waiter
def notify(self):
assert get_ident() == self._lock._owner
for waiter in self._waiters.copy():
waiter.release()
self._waiters.remove(waiter)
class Semaphore:
def __init__(self, value=1):
self._lock = Condition()
self.value = value
def __enter__(self):
with self._lock:
if self.value == 0:
self._lock.wait()
else:
self.value -= 1
def __exit__(self, exc_type, exc_val, exc_tb):
self.value += 1
self._lock.notify()
class BoundedSemaphore(Semaphore):
def __init__(self, value=1):
super(BoundedSemaphore, self).__init__(value=value)
class Event:
def __init__(self):
self._cond = Condition()
self._flag = False
def set(self):
with self._cond:
self._flag = True
self._cond.notify()
def clear(self):
with self._cond:
self._flag = False
def wait(self):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait()
return signaled
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment