Skip to content

Instantly share code, notes, and snippets.

@timofurrer
Created June 28, 2019 12:09
Show Gist options
  • Save timofurrer/db44ad05ffffd74f73384e2eb0bfb682 to your computer and use it in GitHub Desktop.
Save timofurrer/db44ad05ffffd74f73384e2eb0bfb682 to your computer and use it in GitHub Desktop.
Python 3 Implementation of prioritized Lock objects
"""
This module implements a set of prioritized locks.
Implemented:
* PriorityLock
* FIFOPriorityLock
License: MIT <tuxtimo@gmail.com>
"""
import time
import queue
import threading
class PriorityLock:
"""Lock object which prioritizes each acquire
>>> import random
>>> thread_exec_order = []
>>> lock = PriorityLock()
>>> def worker(priority):
... with lock(priority):
... time.sleep(0.2)
... thread_exec_order.append(priority)
>>> threads = [
... threading.Thread(target=worker, args=(p,))
... for p in range(10)
... ]
>>> random.shuffle(threads)
>>> for thread in threads:
... thread.start()
>>> for thread in threads:
... thread.join()
>>> # the first thread to be executed is non-deterministic
>>> assert thread_exec_order[1:] == list(sorted(thread_exec_order[1:]))
"""
class _Context:
def __init__(self, lock, priority):
self._lock = lock
self._priority = priority
def __enter__(self):
self._lock.acquire(self._priority)
def __exit__(self, exc_type, exc_val, exc_tb):
self._lock.release()
def __init__(self):
self._lock = threading.Lock()
self._acquire_queue = queue.PriorityQueue()
self._need_to_wait = False
def acquire(self, priority):
with self._lock:
if not self._need_to_wait:
self._need_to_wait = True
return True
event = threading.Event()
self._acquire_queue.put((priority, event))
event.wait()
return True
def release(self):
with self._lock:
try:
_, event = self._acquire_queue.get_nowait()
except queue.Empty:
self._need_to_wait = False
else:
event.set()
def __call__(self, priority):
return self._Context(self, priority)
class FIFOPriorityLock(PriorityLock):
"""Lock object which prioritizes acquires by first-comes-first-serves
>>> import random
>>> thread_exec_order = []
>>> lock = FIFOPriorityLock()
>>> def worker():
... with lock:
... time.sleep(0.2)
... thread_exec_order.append(threading.current_thread())
>>> threads = [
... threading.Thread(target=worker, name=str(x))
... for x in range(10)
... ]
>>> random.shuffle(threads)
>>> for thread in threads:
... thread.start()
>>> for thread in threads:
... thread.join()
>>> assert thread_exec_order == threads
"""
def acquire(self):
acquiring_time = time.time()
super().acquire(priority=acquiring_time)
def __enter__(self):
self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment