Last active
May 11, 2025 20:46
-
-
Save x42005e1f/a50d0744013b7bbbd7ded608d6a3845b to your computer and use it in GitHub Desktop.
A group-level lock (async-aware & thread-aware)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# SPDX-FileCopyrightText: 2025 Ilya Egorov <0x42005e1f@gmail.com> | |
# SPDX-License-Identifier: ISC | |
from __future__ import annotations | |
from collections import OrderedDict | |
from operator import methodcaller | |
from typing import TYPE_CHECKING | |
from aiologic.lowlevel import ( | |
Event, | |
async_checkpoint, | |
current_async_task_ident, | |
current_green_task_ident, | |
green_checkpoint, | |
) | |
from aiologic.lowlevel._thread import LockType, allocate_lock | |
try: | |
from aiologic.lowlevel import create_async_event, create_green_event | |
except ImportError: # aiologic<0.15.0 | |
from aiologic.lowlevel import ( | |
AsyncEvent as create_async_event, | |
GreenEvent as create_green_event, | |
) | |
try: | |
from aiologic.lowlevel import ( | |
async_checkpoint_enabled, | |
green_checkpoint_enabled, | |
) | |
except ImportError: # aiologic<0.15.0 | |
def async_checkpoint_enabled() -> bool: | |
return True | |
def green_checkpoint_enabled() -> bool: | |
return True | |
if TYPE_CHECKING: | |
import sys | |
from types import TracebackType | |
if sys.version_info >= (3, 11): | |
from typing import Self | |
else: | |
from typing_extensions import Self | |
if sys.version_info >= (3, 9): | |
from collections.abc import Callable | |
else: | |
from typing import Callable | |
if hasattr(Event, "cancelled"): | |
cancelled = methodcaller("cancelled") | |
else: # aiologic<0.15.0 | |
cancelled = methodcaller("cancel") | |
class GLock: | |
__slots__ = ( | |
"__mutex", | |
"__waiters", | |
"__weakref__", | |
"_default_group", | |
"_default_group_factory", | |
"_group", | |
"owners", | |
"wrapped", | |
) | |
__mutex: LockType | |
__waiters: OrderedDict[object, dict[tuple[str, int], Event]] | |
_default_group: object | |
_default_group_factory: Callable[[], object] | None | |
_group: object | |
owners: dict[tuple[str, int], int] | |
wrapped: GLock | None | |
def __init__( | |
self, | |
/, | |
wrapped: GLock | None = None, | |
*, | |
default_group: object = None, | |
default_group_factory: Callable[[], object] | None = None, | |
) -> None: | |
if default_group is not None and default_group_factory is not None: | |
msg = "cannot specify both default_group and default_group_factory" | |
raise ValueError(msg) | |
self._default_group = default_group | |
self._default_group_factory = default_group_factory | |
if wrapped is not None: | |
self.__mutex = wrapped.__mutex | |
self.__waiters = wrapped.__waiters | |
self.owners = wrapped.owners | |
self.wrapped = wrapped | |
else: | |
self.__mutex = allocate_lock() | |
self.__waiters = OrderedDict() | |
self._group = None | |
self.owners = {} | |
self.wrapped = None | |
def __repr__(self, /) -> str: | |
cls = self.__class__ | |
cls_repr = f"{cls.__module__}.{cls.__qualname__}" | |
return f"{cls_repr}()" | |
def __bool__(self, /) -> bool: | |
return self.group is not None | |
async def __aenter__(self, /) -> Self: | |
await self.async_acquire() | |
return self | |
async def __aexit__( | |
self, | |
/, | |
exc_type: type[BaseException] | None, | |
exc_value: BaseException | None, | |
traceback: TracebackType | None, | |
) -> None: | |
self.async_release() | |
def __enter__(self, /) -> Self: | |
self.green_acquire() | |
return self | |
def __exit__( | |
self, | |
/, | |
exc_type: type[BaseException] | None, | |
exc_value: BaseException | None, | |
traceback: TracebackType | None, | |
) -> None: | |
self.green_release() | |
async def async_acquire_on_behalf_of( | |
self, | |
/, | |
group: object, | |
*, | |
blocking: bool = True, | |
) -> bool: | |
task = current_async_task_ident() | |
if task in self.owners: | |
if blocking: | |
await async_checkpoint() | |
self.owners[task] += 1 | |
return True | |
if group is None: | |
if self._default_group_factory is not None: | |
group = self._default_group_factory() | |
elif self._default_group is not None: | |
group = self._default_group | |
else: | |
group = task | |
mutex = self.__mutex | |
mutex.acquire() | |
mutex_acquired = True | |
try: | |
if self.group is None or ( | |
self.group == group and not self.__waiters | |
): | |
if blocking and async_checkpoint_enabled(): | |
mutex_acquired = False | |
mutex.release() | |
await async_checkpoint() | |
mutex.acquire() | |
mutex_acquired = True | |
if self.group is None or ( | |
self.group == group and not self.__waiters | |
): | |
self.group = group | |
self.owners[task] = 1 | |
return True | |
if not blocking: | |
return False | |
events = self.__waiters.setdefault(group, {}) | |
events[task] = event = create_async_event() | |
success = False | |
mutex_acquired = False | |
mutex.release() | |
try: | |
success = await event | |
finally: | |
if not success: | |
mutex.acquire() | |
mutex_acquired = True | |
if cancelled(event): | |
del events[task] | |
if not events and self.__waiters.get(group) is events: | |
del self.__waiters[group] | |
else: | |
del self.owners[task] | |
if not self.owners: | |
self.group = None | |
self.__release() | |
return success | |
finally: | |
if mutex_acquired: | |
mutex.release() | |
def green_acquire_on_behalf_of( | |
self, | |
/, | |
group: object, | |
*, | |
blocking: bool = True, | |
timeout: float | None = None, | |
) -> bool: | |
task = current_green_task_ident() | |
if task in self.owners: | |
if blocking: | |
green_checkpoint() | |
self.owners[task] += 1 | |
return True | |
if group is None: | |
if self._default_group_factory is not None: | |
group = self._default_group_factory() | |
elif self._default_group is not None: | |
group = self._default_group | |
else: | |
group = task | |
mutex = self.__mutex | |
mutex.acquire() | |
mutex_acquired = True | |
try: | |
if self.group is None or ( | |
self.group == group and not self.__waiters | |
): | |
if blocking and green_checkpoint_enabled(): | |
mutex_acquired = False | |
mutex.release() | |
green_checkpoint() | |
mutex.acquire() | |
mutex_acquired = True | |
if self.group is None or ( | |
self.group == group and not self.__waiters | |
): | |
self.group = group | |
self.owners[task] = 1 | |
return True | |
if not blocking: | |
return False | |
events = self.__waiters.setdefault(group, {}) | |
events[task] = event = create_green_event() | |
success = False | |
mutex_acquired = False | |
mutex.release() | |
try: | |
success = event.wait(timeout) | |
finally: | |
if not success: | |
mutex.acquire() | |
mutex_acquired = True | |
if cancelled(event): | |
del events[task] | |
if not events and self.__waiters.get(group) is events: | |
del self.__waiters[group] | |
else: | |
del self.owners[task] | |
if not self.owners: | |
self.group = None | |
self.__release() | |
return success | |
finally: | |
if mutex_acquired: | |
mutex.release() | |
async def async_acquire(self, /, *, blocking: bool = True) -> bool: | |
return await self.async_acquire_on_behalf_of(None, blocking=blocking) | |
def green_acquire( | |
self, | |
/, | |
*, | |
blocking: bool = True, | |
timeout: float | None = None, | |
) -> bool: | |
return self.green_acquire_on_behalf_of( | |
None, | |
blocking=blocking, | |
timeout=timeout, | |
) | |
def async_release(self, /) -> None: | |
if self.group is None: | |
msg = "release unlocked lock" | |
raise RuntimeError(msg) | |
task = current_async_task_ident() | |
if task not in self.owners: | |
msg = "the current task is not holding this lock" | |
raise RuntimeError(msg) | |
self.owners[task] -= 1 | |
if not self.owners[task]: | |
with self.__mutex: | |
del self.owners[task] | |
if not self.owners: | |
self.group = None | |
self.__release() | |
def green_release(self, /) -> None: | |
if self.group is None: | |
msg = "release unlocked lock" | |
raise RuntimeError(msg) | |
task = current_green_task_ident() | |
if task not in self.owners: | |
msg = "the current task is not holding this lock" | |
raise RuntimeError(msg) | |
self.owners[task] -= 1 | |
if not self.owners[task]: | |
with self.__mutex: | |
del self.owners[task] | |
if not self.owners: | |
self.group = None | |
self.__release() | |
def __release(self, /) -> None: | |
if self.__waiters: | |
group, events = self.__waiters.popitem(last=False) | |
for task, event in events.items(): | |
self.owners[task] = 1 | |
event.set() | |
self.group = group | |
def async_owned(self, /) -> bool: | |
return current_async_task_ident() in self.owners | |
def green_owned(self, /) -> bool: | |
return current_green_task_ident() in self.owners | |
def locked(self, /) -> bool: | |
return self.group is not None | |
@property | |
def group(self, /) -> object: | |
if self.wrapped is not None: | |
return self.wrapped._group | |
else: | |
return self._group | |
@group.setter # internal API! | |
def group(self, /, value: object) -> None: | |
if self.wrapped is not None: | |
self.wrapped._group = value | |
else: | |
self._group = value | |
@property | |
def waiting(self, /) -> int: | |
with self.__mutex: | |
return sum(len(events) for events in self.__waiters.values()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is an implementation of group-level locks, a new primitive derived from reentrant locks by adding methods to acquire on behalf of a group (any hashable object). It is a primitive built on the aiologic package, but not included because it synchronizes internal state (which violates the lock-free implementation feature).
GLock, a class of group-level locks, has quite rich functionality. First, it can be used as a typical aiologic-style RLock:
Second, it can be used as a readers-writer lock (RWLock) that is both phase-fair and reentrant. This is the key feature for which this primitive was created. Just look at this example:
example.py
output.txt
Like readers-writer locks, it allows readers to work concurrently while ensuring exclusive access for writers. Like phase-fair readers-writer locks, all pending read operations are started as soon as the next writer exits (or immediately if there are none). And like reentrant readers-writer locks, it can be reacquired.
Note that the current implementation does not support upgrading & downgrading. Instead it just increments the counter, even if the reader wants to reacquire the lock as a writer. Be careful!
Third, it can be used for finer synchronization. For example, you can use GLock to perform thread-level synchronization when your code is coroutine-safe but not thread-safe:
example.py
output.txt
However, with a few modifications you can get even more features. For example, in the implementation you can find pieces of code like this:
They handle when a task can enter the current group. Right now it is "if there is no group (lock is unlocked) or the group is the same as the current task's group (e.g. 'reading') and there is no other pending group (e.g. writer)". Without the latter (
and not self.__waiters
), we would have read-preferring readers-writer locks instead of phase-fair readers-writer locks. But if you add to that, for example,and len(self.owners) <= concurrency
, you can limit the maximum possible number of tasks in each group, as if it were a semaphore!Replacing a single group (
self.group
) with multiple groups (self.groups
) can give even more impressive results. This way you can control which groups can access a resource at the same time, or how many such groups can do so.Thus, in addition to group-level locks, there are also group-level capacity limiters, which can be obtained by modifying it. With this knowledge, GLock can actually be extended to a more generalized class, which let's call Grouper (by analogy with Phaser, a barrier-like primitive you may know from Java). Grouper will appear in aiologic 0.16.0/0.17.0.