Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Path lock using a global read/write lock
import asyncio
import contextlib
from fifolock import FifoLock
class Read(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Write]
class Write(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Read] and not holds[Write]
class PathLock():
def __init__(self):
self.lock = FifoLock()
@contextlib.asynccontextmanager
async def __call__(self, read, write):
mode = Write if write else Read
async with self.lock(mode):
yield
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.