Skip to content

Instantly share code, notes, and snippets.

@x42005e1f
Last active May 11, 2025 20:46
Show Gist options
  • Save x42005e1f/a50d0744013b7bbbd7ded608d6a3845b to your computer and use it in GitHub Desktop.
Save x42005e1f/a50d0744013b7bbbd7ded608d6a3845b to your computer and use it in GitHub Desktop.
A group-level lock (async-aware & thread-aware)
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2025 Ilya Egorov <0x42005e1f@gmail.com>
# SPDX-License-Identifier: ISC
from __future__ import annotations
from collections import OrderedDict
from operator import methodcaller
from typing import TYPE_CHECKING
from aiologic.lowlevel import (
Event,
async_checkpoint,
current_async_task_ident,
current_green_task_ident,
green_checkpoint,
)
from aiologic.lowlevel._thread import LockType, allocate_lock
try:
from aiologic.lowlevel import create_async_event, create_green_event
except ImportError: # aiologic<0.15.0
from aiologic.lowlevel import (
AsyncEvent as create_async_event,
GreenEvent as create_green_event,
)
try:
from aiologic.lowlevel import (
async_checkpoint_enabled,
green_checkpoint_enabled,
)
except ImportError: # aiologic<0.15.0
def async_checkpoint_enabled() -> bool:
return True
def green_checkpoint_enabled() -> bool:
return True
if TYPE_CHECKING:
import sys
from types import TracebackType
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
if sys.version_info >= (3, 9):
from collections.abc import Callable
else:
from typing import Callable
if hasattr(Event, "cancelled"):
cancelled = methodcaller("cancelled")
else: # aiologic<0.15.0
cancelled = methodcaller("cancel")
class GLock:
__slots__ = (
"__mutex",
"__waiters",
"__weakref__",
"_default_group",
"_default_group_factory",
"_group",
"owners",
"wrapped",
)
__mutex: LockType
__waiters: OrderedDict[object, dict[tuple[str, int], Event]]
_default_group: object
_default_group_factory: Callable[[], object] | None
_group: object
owners: dict[tuple[str, int], int]
wrapped: GLock | None
def __init__(
self,
/,
wrapped: GLock | None = None,
*,
default_group: object = None,
default_group_factory: Callable[[], object] | None = None,
) -> None:
if default_group is not None and default_group_factory is not None:
msg = "cannot specify both default_group and default_group_factory"
raise ValueError(msg)
self._default_group = default_group
self._default_group_factory = default_group_factory
if wrapped is not None:
self.__mutex = wrapped.__mutex
self.__waiters = wrapped.__waiters
self.owners = wrapped.owners
self.wrapped = wrapped
else:
self.__mutex = allocate_lock()
self.__waiters = OrderedDict()
self._group = None
self.owners = {}
self.wrapped = None
def __repr__(self, /) -> str:
cls = self.__class__
cls_repr = f"{cls.__module__}.{cls.__qualname__}"
return f"{cls_repr}()"
def __bool__(self, /) -> bool:
return self.group is not None
async def __aenter__(self, /) -> Self:
await self.async_acquire()
return self
async def __aexit__(
self,
/,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.async_release()
def __enter__(self, /) -> Self:
self.green_acquire()
return self
def __exit__(
self,
/,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.green_release()
async def async_acquire_on_behalf_of(
self,
/,
group: object,
*,
blocking: bool = True,
) -> bool:
task = current_async_task_ident()
if task in self.owners:
if blocking:
await async_checkpoint()
self.owners[task] += 1
return True
if group is None:
if self._default_group_factory is not None:
group = self._default_group_factory()
elif self._default_group is not None:
group = self._default_group
else:
group = task
mutex = self.__mutex
mutex.acquire()
mutex_acquired = True
try:
if self.group is None or (
self.group == group and not self.__waiters
):
if blocking and async_checkpoint_enabled():
mutex_acquired = False
mutex.release()
await async_checkpoint()
mutex.acquire()
mutex_acquired = True
if self.group is None or (
self.group == group and not self.__waiters
):
self.group = group
self.owners[task] = 1
return True
if not blocking:
return False
events = self.__waiters.setdefault(group, {})
events[task] = event = create_async_event()
success = False
mutex_acquired = False
mutex.release()
try:
success = await event
finally:
if not success:
mutex.acquire()
mutex_acquired = True
if cancelled(event):
del events[task]
if not events and self.__waiters.get(group) is events:
del self.__waiters[group]
else:
del self.owners[task]
if not self.owners:
self.group = None
self.__release()
return success
finally:
if mutex_acquired:
mutex.release()
def green_acquire_on_behalf_of(
self,
/,
group: object,
*,
blocking: bool = True,
timeout: float | None = None,
) -> bool:
task = current_green_task_ident()
if task in self.owners:
if blocking:
green_checkpoint()
self.owners[task] += 1
return True
if group is None:
if self._default_group_factory is not None:
group = self._default_group_factory()
elif self._default_group is not None:
group = self._default_group
else:
group = task
mutex = self.__mutex
mutex.acquire()
mutex_acquired = True
try:
if self.group is None or (
self.group == group and not self.__waiters
):
if blocking and green_checkpoint_enabled():
mutex_acquired = False
mutex.release()
green_checkpoint()
mutex.acquire()
mutex_acquired = True
if self.group is None or (
self.group == group and not self.__waiters
):
self.group = group
self.owners[task] = 1
return True
if not blocking:
return False
events = self.__waiters.setdefault(group, {})
events[task] = event = create_green_event()
success = False
mutex_acquired = False
mutex.release()
try:
success = event.wait(timeout)
finally:
if not success:
mutex.acquire()
mutex_acquired = True
if cancelled(event):
del events[task]
if not events and self.__waiters.get(group) is events:
del self.__waiters[group]
else:
del self.owners[task]
if not self.owners:
self.group = None
self.__release()
return success
finally:
if mutex_acquired:
mutex.release()
async def async_acquire(self, /, *, blocking: bool = True) -> bool:
return await self.async_acquire_on_behalf_of(None, blocking=blocking)
def green_acquire(
self,
/,
*,
blocking: bool = True,
timeout: float | None = None,
) -> bool:
return self.green_acquire_on_behalf_of(
None,
blocking=blocking,
timeout=timeout,
)
def async_release(self, /) -> None:
if self.group is None:
msg = "release unlocked lock"
raise RuntimeError(msg)
task = current_async_task_ident()
if task not in self.owners:
msg = "the current task is not holding this lock"
raise RuntimeError(msg)
self.owners[task] -= 1
if not self.owners[task]:
with self.__mutex:
del self.owners[task]
if not self.owners:
self.group = None
self.__release()
def green_release(self, /) -> None:
if self.group is None:
msg = "release unlocked lock"
raise RuntimeError(msg)
task = current_green_task_ident()
if task not in self.owners:
msg = "the current task is not holding this lock"
raise RuntimeError(msg)
self.owners[task] -= 1
if not self.owners[task]:
with self.__mutex:
del self.owners[task]
if not self.owners:
self.group = None
self.__release()
def __release(self, /) -> None:
if self.__waiters:
group, events = self.__waiters.popitem(last=False)
for task, event in events.items():
self.owners[task] = 1
event.set()
self.group = group
def async_owned(self, /) -> bool:
return current_async_task_ident() in self.owners
def green_owned(self, /) -> bool:
return current_green_task_ident() in self.owners
def locked(self, /) -> bool:
return self.group is not None
@property
def group(self, /) -> object:
if self.wrapped is not None:
return self.wrapped._group
else:
return self._group
@group.setter # internal API!
def group(self, /, value: object) -> None:
if self.wrapped is not None:
self.wrapped._group = value
else:
self._group = value
@property
def waiting(self, /) -> int:
with self.__mutex:
return sum(len(events) for events in self.__waiters.values())
@x42005e1f
Copy link
Author

x42005e1f commented Apr 12, 2025

This is an implementation of group-level locks, a new primitive derived from reentrant locks by adding methods to acquire on behalf of a group (any hashable object). It is a primitive built on the aiologic package, but not included because it synchronizes internal state (which violates the lock-free implementation feature).

GLock, a class of group-level locks, has quite rich functionality. First, it can be used as a typical aiologic-style RLock:

lock = GLock()

with lock:  # in a synchronous function (thread or greenlet)
    ...

async with lock:  # in an asynchronous function (coroutine)
    ...

    async with lock:  # can be reacquired too!
        ...

        with lock:  # but not like this! (causes deadlock)
            ...

Second, it can be used as a readers-writer lock (RWLock) that is both phase-fair and reentrant. This is the key feature for which this primitive was created. Just look at this example:

example.py
import asyncio
import sys

from glock import GLock

base = GLock()  # group-level!

reading = GLock(base, default_group="reading")
writing = GLock(base)


async def read(i):
    async with reading:
        print(f"> read #{i} (start)")

        await asyncio.sleep(0)  # emulate some work

        async with reading:
            print(f"* read #{i} (inner reading)")

            await asyncio.sleep(0)  # emulate some work

        await asyncio.sleep(0)  # emulate some work

        print(f"< read #{i} (end)")


async def write(i):
    async with writing:
        print(f"> write #{i} (start)")

        await asyncio.sleep(0)  # emulate some work

        async with reading:
            print(f"* write #{i} (inner reading)")

            await asyncio.sleep(0)  # emulate some work

        async with writing:
            print(f"* write #{i} (inner writing)")

            await asyncio.sleep(0)  # emulate some work

        await asyncio.sleep(0)  # emulate some work

        print(f"< write #{i} (end)")


async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(read(1))
        tg.create_task(read(2))
        tg.create_task(read(3))

        tg.create_task(write(1))
        tg.create_task(write(2))
        tg.create_task(write(3))

        tg.create_task(read(4))
        tg.create_task(read(5))
        tg.create_task(read(6))

        tg.create_task(write(4))
        tg.create_task(write(5))
        tg.create_task(write(6))

        tg.create_task(read(7))
        tg.create_task(read(8))
        tg.create_task(read(9))


if __name__ == "__main__":
    sys.exit(asyncio.run(main()))
output.txt
> read #1 (start)
> read #2 (start)
> read #3 (start)
* read #1 (inner reading)
* read #2 (inner reading)
* read #3 (inner reading)
< read #1 (end)
< read #2 (end)
< read #3 (end)
> write #1 (start)
* write #1 (inner reading)
* write #1 (inner writing)
< write #1 (end)
> write #2 (start)
* write #2 (inner reading)
* write #2 (inner writing)
< write #2 (end)
> write #3 (start)
* write #3 (inner reading)
* write #3 (inner writing)
< write #3 (end)
> read #4 (start)
> read #5 (start)
> read #6 (start)
> read #7 (start)
> read #8 (start)
> read #9 (start)
* read #4 (inner reading)
* read #5 (inner reading)
* read #6 (inner reading)
* read #7 (inner reading)
* read #8 (inner reading)
* read #9 (inner reading)
< read #4 (end)
< read #5 (end)
< read #6 (end)
< read #7 (end)
< read #8 (end)
< read #9 (end)
> write #4 (start)
* write #4 (inner reading)
* write #4 (inner writing)
< write #4 (end)
> write #5 (start)
* write #5 (inner reading)
* write #5 (inner writing)
< write #5 (end)
> write #6 (start)
* write #6 (inner reading)
* write #6 (inner writing)
< write #6 (end)

Like readers-writer locks, it allows readers to work concurrently while ensuring exclusive access for writers. Like phase-fair readers-writer locks, all pending read operations are started as soon as the next writer exits (or immediately if there are none). And like reentrant readers-writer locks, it can be reacquired.

Note that the current implementation does not support upgrading & downgrading. Instead it just increments the counter, even if the reader wants to reacquire the lock as a writer. Be careful!

Third, it can be used for finer synchronization. For example, you can use GLock to perform thread-level synchronization when your code is coroutine-safe but not thread-safe:

example.py
import asyncio
import time

from threading import Thread

from aiologic.lowlevel import current_thread_ident

from glock import GLock

lock = GLock(default_group_factory=current_thread_ident)  # thread-level!


async def access(i, j):
    async with lock:
        print(f"-> thread={i} task={j} start")

        await asyncio.sleep(1)  # emulate some work

        print(f"<- thread={i} task={j} end")


async def main(i):
    async with asyncio.TaskGroup() as tg:
        tg.create_task(access(i, 1))
        tg.create_task(access(i, 2))
        tg.create_task(access(i, 3))


Thread(target=asyncio.run, args=[main(1)]).start()
time.sleep(0.5)  # allow all tasks from the first thread to acquire the lock
Thread(target=asyncio.run, args=[main(2)]).start()
output.txt
-> thread=1 task=1 start
-> thread=1 task=2 start
-> thread=1 task=3 start
<- thread=1 task=1 end
<- thread=1 task=2 end
<- thread=1 task=3 end
-> thread=2 task=1 start
-> thread=2 task=2 start
-> thread=2 task=3 start
<- thread=2 task=1 end
<- thread=2 task=2 end
<- thread=2 task=3 end

However, with a few modifications you can get even more features. For example, in the implementation you can find pieces of code like this:

if self.group is None or (
    self.group == group and not self.__waiters
):
    ...

They handle when a task can enter the current group. Right now it is "if there is no group (lock is unlocked) or the group is the same as the current task's group (e.g. 'reading') and there is no other pending group (e.g. writer)". Without the latter (and not self.__waiters), we would have read-preferring readers-writer locks instead of phase-fair readers-writer locks. But if you add to that, for example, and len(self.owners) <= concurrency, you can limit the maximum possible number of tasks in each group, as if it were a semaphore!

Replacing a single group (self.group) with multiple groups (self.groups) can give even more impressive results. This way you can control which groups can access a resource at the same time, or how many such groups can do so.

Thus, in addition to group-level locks, there are also group-level capacity limiters, which can be obtained by modifying it. With this knowledge, GLock can actually be extended to a more generalized class, which let's call Grouper (by analogy with Phaser, a barrier-like primitive you may know from Java). Grouper will appear in aiologic 0.16.0/0.17.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment