Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python asyncio read-write lock, using a generic first-in-first-out lock
import asyncio
import collections
import contextlib
class Read(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Write]
class Write(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Read] and not holds[Write]
class FifoLock():
def __init__(self):
self._waiters = collections.deque()
self._holds = collections.defaultdict(int)
def _maybe_acquire(self):
while True:
if not self._waiters:
break
if self._waiters[0].cancelled():
self._waiters.popleft()
continue
if self._waiters[0].is_compatible(self._holds):
waiter = self._waiters.popleft()
self._holds[type(waiter)] += 1
waiter.set_result(None)
continue
break
@contextlib.asynccontextmanager
async def __call__(self, lock_mode_type):
lock_mode = lock_mode_type()
self._waiters.append(lock_mode)
self._maybe_acquire()
try:
await lock_mode
except asyncio.CancelledError:
self._maybe_acquire()
raise
try:
yield
finally:
self._holds[type(lock_mode)] -= 1
self._maybe_acquire()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.