Skip to content

Instantly share code, notes, and snippets.

@yetone
Last active September 19, 2019 06:24
Show Gist options
  • Save yetone/d43d9e1d282eb2df6214bb503151756d to your computer and use it in GitHub Desktop.
Save yetone/d43d9e1d282eb2df6214bb503151756d to your computer and use it in GitHub Desktop.
from threading import RLock
from typing import Dict
class _RLockWrapper:
def __init__(self):
self.lock = RLock()
self.count = 0
def acquire(self, blocking: bool = ..., timeout: float = ...) -> bool:
locked = self.lock.acquire(blocking=blocking, timeout=timeout)
if locked:
self.count += 1
return locked
def release(self) -> None:
self.lock.release()
self.count -= 1
def is_released(self) -> bool:
return self.count == 0
class PLock:
def __init__(self):
self.self_lock = RLock()
self.locks: Dict[int, _RLockWrapper] = {}
self.current_priority = 0
def acquire(self, priority: int, blocking=True, timeout=-1) -> bool:
self.self_lock.acquire()
lock = self.locks.get(priority)
if not lock:
lock = self.locks[priority] = _RLockWrapper()
if priority > self.current_priority:
self.current_priority = priority
self.self_lock.release()
return lock.acquire(blocking=blocking, timeout=timeout)
cp_lock = self.locks.get(self.current_priority)
if cp_lock and not cp_lock.is_released():
self.current_priority = priority
self.self_lock.release()
return cp_lock.acquire(blocking=blocking, timeout=timeout)
self.self_lock.release()
return lock.acquire(blocking=blocking, timeout=timeout)
def release(self, priority: int):
with self.self_lock:
lock = self.locks.get(priority)
if lock:
if not lock.is_released():
lock.release()
if lock.is_released():
self.locks.pop(priority)
def with_priority(self, priority=0) -> '_LockWithPriority':
return _LockWithPriority(self, priority)
def __enter__(self):
return self.acquire(0)
def __exit__(self, exc_type, exc_val, exc_tb):
self.release(0)
class _LockWithPriority:
def __init__(self, lock: PLock, priority: int):
self.lock = lock
self.priority = priority
def __enter__(self):
return self.lock.acquire(self.priority)
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release(self.priority)
if exc_val:
raise exc_val
import time
from threading import Thread
from prioritized_lock import PLock
lock = PLock()
def m():
count = -1
while True:
count += 1
print(f'[{count}] m run')
with lock.with_priority(0):
print(f'[{count}] m: a')
time.sleep(0.5)
print(f'[{count}] m: b')
time.sleep(0.5)
print(f'[{count}] m: c')
time.sleep(0.5)
def n():
count = -1
while True:
time.sleep(4)
count += 1
print(f'[{count}] n run')
with lock.with_priority(1):
print(f'[{count}] n: a')
time.sleep(1)
print(f'[{count}] n: b')
time.sleep(1)
print(f'[{count}] n: c')
time.sleep(1)
print(f'[{count}] n: d')
time.sleep(1)
print(f'[{count}] n: e')
time.sleep(1)
print(f'[{count}] n: f')
time.sleep(1)
def main():
mt = Thread(target=m)
nt = Thread(target=n)
mt.start()
nt.start()
mt.join()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment