Path lock using a global read/write lock
import asyncio
import contextlib
from fifolock import FifoLock
class Read(asyncio.Future):
def is_compatible(holds):
return not holds[Write]
class Write(asyncio.Future):
def is_compatible(holds):
return not holds[Read] and not holds[Write]
class PathLock():
def __init__(self):
self.lock = FifoLock()
async def __call__(self, read, write):
mode = Write if write else Read
async with self.lock(mode):
