Skip to content

Instantly share code, notes, and snippets.

@michalc
Last active November 25, 2018 09:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michalc/6e4b528d04b05e299ca714a9e8fa1dcd to your computer and use it in GitHub Desktop.
Save michalc/6e4b528d04b05e299ca714a9e8fa1dcd to your computer and use it in GitHub Desktop.
Path lock using a global read/write lock
import asyncio
import contextlib
from fifolock import FifoLock
class Read(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Write]
class Write(asyncio.Future):
@staticmethod
def is_compatible(holds):
return not holds[Read] and not holds[Write]
class PathLock():
def __init__(self):
self.lock = FifoLock()
@contextlib.asynccontextmanager
async def __call__(self, read, write):
mode = Write if write else Read
async with self.lock(mode):
yield
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment