-
-
Save jonghwanhyeon/3c9ce95e665e6e036b193ec8c7d944e9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import annotations | |
import asyncio | |
from abc import ABC, abstractmethod | |
from typing import Any, Generic, Hashable, Optional, Type, TypeVar, Union | |
from typing_extensions import Concatenate, Protocol, Self, overload | |
from eventemitter.events import Events | |
from eventemitter.handlers import AbstractHandler, AsyncHandler, Handler | |
from eventemitter.types import AsyncListenable, Callable, Listenable, P, R_co, Returns, T | |
from eventemitter.utils import run_coroutine | |
L = TypeVar("L", bound=Union[Listenable, AsyncListenable]) # for classes | |
H = TypeVar("H", bound=AbstractHandler) | |
F = TypeVar("F", bound=Union[Listenable, AsyncListenable]) # for functions | |
# Reference: https://nodejs.org/api/events.html | |
class AbstractEventEmitter(ABC, Generic[L, H]): | |
"""An abstract class for EventEmitter classes. | |
This class provides a generic implementation for `EventEmitter`s that can manage and emit listeners for events. | |
All `EventEmitter`s emit the event `"new_listener"` when new listeners are added and `"remove_listener"` when existing listeners are removed. | |
""" | |
__slots__ = ("_events",) | |
_handler_cls: Type[H] | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
"""Initializes an instance of `AbstractEventEmitter`. | |
Args: | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
""" | |
# To support cooperative multiple inheritance | |
# Reference: https://rhettinger.wordpress.com/2011/05/26/super-considered-super/ | |
super().__init__(*args, **kwargs) | |
self._events: Events[L, H] = Events[L, H]() | |
def add_listener(self, event: Hashable, listener: L) -> Self: | |
"""Adds the `listener` function to the end of the listeners list for the event named `event`. Multiple calls passing the same combination of `event` and `listener` will result in the `listener` being added, and called, multiple times. | |
Args: | |
event: The name of the event | |
listener: The callback function | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained. | |
""" | |
return self._append_handler(event, self._handler_cls.from_func(listener, once=False)) | |
def prepend_listener(self, event: Hashable, listener: L) -> Self: | |
"""Adds the `listener` function to the *beginning* of the listeners list for the event named `event`. Multiple calls passing the same combination of `event` and `listener` will result in the `listener` being added, and called, multiple times. | |
Args: | |
event: The name of the event | |
listener: The callback function | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained. | |
""" | |
return self._prepend_handler(event, self._handler_cls.from_func(listener, once=False)) | |
def prepend_once_listener(self, event: Hashable, listener: L) -> Self: | |
"""Adds a **one-time** `listener` function for the event named `event` to the *beginning* of the listeners list. The next time `event` is triggered, this `listener` is removed, and then invoked. | |
Args: | |
event: The name of the event | |
listener: The callback function | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained. | |
""" | |
return self._prepend_handler(event, self._handler_cls.from_func(listener, once=True)) | |
@abstractmethod | |
def emit(self, event: Hashable, *args: Any, **kwargs: Any) -> Returns[bool]: | |
"""Calls each of the listeners registered for the event named `event`, passing the supplied arguments to each. | |
Args: | |
event: The name of the event | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
Returns: | |
`True` if the `event` had listeners, `False` otherwise. | |
""" | |
raise NotImplementedError() | |
def events(self) -> list[Hashable]: | |
"""Returns a list of the events for which the emitter has registered listeners. | |
Returns: | |
A list of the `event`s | |
""" | |
return list(self._events.keys()) | |
def listeners(self, event: Hashable) -> list[L]: | |
"""Returns a copy of the list of listeners for the event named `event`. | |
Args: | |
event: The name of the event | |
Returns: | |
A copy of the list of listeners for the event named `event`. | |
""" | |
return self._events.listeners(event) | |
def off(self, event: Hashable, listener: L) -> Self: | |
"""Alias for `remove_listener()`. | |
Args: | |
event: The name of the event | |
listener: The listener to remove | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained. | |
""" | |
return self.remove_listener(event, listener) | |
@overload | |
def on(self, event: Hashable, listener: L) -> Self: ... | |
@overload | |
def on(self, event: Hashable) -> Callable[[F], F]: ... | |
def on(self, event: Hashable, listener: Optional[F] = None) -> Union[Self, Callable[[F], F]]: | |
"""Adds the `listener` function to the end of the listeners list for the event named `event` if a `listener` is provided, or returns a decorator to add the decorated function as a `listener` if no `listener` is provided. | |
Args: | |
event: The name of the event | |
listener: The callback function | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained if a `listener` is provided. | |
Otherwise, returns a decorator that adds the decorated function to the listeners list for the event named `event`. | |
""" | |
def decorator(listener: F) -> F: | |
self._append_handler(event, self._handler_cls.from_func(listener, once=False)) | |
return listener | |
if listener is not None: | |
decorator(listener) | |
return decorator if listener is None else self | |
@overload | |
def once(self, event: Hashable, listener: L) -> Self: ... | |
@overload | |
def once(self, event: Hashable) -> Callable[[F], F]: ... | |
def once(self, event: Hashable, listener: Optional[F] = None) -> Union[Self, Callable[[F], F]]: | |
"""Adds a **one-time** `listener` function for the event named `event` if a `listener` is provided, or returns a decorator that adds the decorated function as a **one-time** `listener` if no `listener` is provided. | |
Args: | |
event: The name of the event | |
listener: The callback function | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained if a `listener` is provided. | |
Otherwise, returns a decorator that adds the decorated function as a **one-time** `listener` for the event named `event`. | |
""" | |
def decorator(listener: F) -> F: | |
self._append_handler(event, self._handler_cls.from_func(listener, once=True)) | |
return listener | |
if listener is not None: | |
decorator(listener) | |
return decorator if listener is None else self | |
def remove_all_listeners(self, event: Optional[Hashable] = None) -> Self: | |
"""Removes all listeners, or those of the specified `event`. | |
Args: | |
event: The name of the event | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained. | |
""" | |
if event is None: | |
for event in self.events(): | |
self.remove_all_listeners(event) | |
else: | |
for handler in reversed(self._events.handlers(event)): | |
self._remove_handler(event, handler) | |
return self | |
def remove_listener(self, event: Hashable, listener: L) -> Self: | |
"""Removes the specified `listener` from the listener list for the event named `event`. | |
`remove_listener()` will remove, at most, one instance of a listener from the listener list. | |
If any single listener has been added multiple times to the listener list for the specified `event`, then `remove_listener()` must be called multiple times to remove each instance. | |
Once an event is emitted, all listeners attached to it at the time of emitting are called in order. | |
This implies that any `remove_listener()` or `remove_all_listeners()` calls after emitting and before the last listener finishes execution will not remove them from emit() in progress. | |
Subsequent events behave as expected. | |
When a single function has been added as a handler multiple times for a single event, `remove_listener()` will remove the most recently added instance. | |
Args: | |
event: The name of the event | |
listener: The listener to remove | |
Returns: | |
A reference to the `EventEmitter`, so that calls can be chained. | |
""" | |
return self._remove_handler(event, listener) | |
def _append_handler(self, event: Hashable, handler: H) -> Self: | |
self._emit_until_complete("new_listener", event, handler.func) | |
self._events[event].append(handler) | |
return self | |
def _prepend_handler(self, event: Hashable, handler: H) -> Self: | |
self._emit_until_complete("new_listener", event, handler.func) | |
self._events[event].prepend(handler) | |
return self | |
@overload | |
def _remove_handler(self, event: Hashable, target: H) -> Self: ... | |
@overload | |
def _remove_handler(self, event: Hashable, target: Union[Listenable, AsyncListenable]) -> Self: ... | |
def _remove_handler(self, event: Hashable, target: Union[H, Listenable, AsyncListenable]) -> Self: | |
if event not in self._events: | |
return self | |
try: | |
if isinstance(target, self._handler_cls): | |
handler = self._events[event].remove(target, last=True) | |
else: | |
handler = self._events[event].remove_by_id(target, last=True) | |
self._emit_until_complete("remove_listener", event, handler.func) | |
except ValueError: | |
pass | |
if not self._events[event]: | |
del self._events[event] | |
return self | |
@abstractmethod | |
def _emit_until_complete(self, event: Hashable, *args: Any, **kwargs: Any) -> None: ... | |
class EventEmitter(AbstractEventEmitter[Listenable, Handler]): | |
"""An `EventEmitter` class that executes listeners synchronously. | |
This class allows you to add and remove listeners for specified events and emit those events with arbitrary arguments. | |
When a new listener is added, the `EventEmitter` emits a `"new_listener"` event. When an existing listener is removed, the `EventEmitter` emits a `"remove_listener"` event. | |
""" | |
_handler_cls = Handler | |
_executor = staticmethod(lambda func, *args, **kwargs: func(*args, **kwargs)) | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
"""Initializes an instance of `EventEmitter`. | |
Args: | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
""" | |
super().__init__(*args, **kwargs) | |
def emit(self, event: Hashable, *args: Any, **kwargs: Any) -> bool: | |
"""Calls each of the listeners registered for the event named `event`, in the order they were registered, passing the supplied arguments to each. | |
Args: | |
event: The name of the event | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
Returns: | |
`True` if the `event` had listeners, `False` otherwise. | |
""" | |
if event not in self._events: | |
return False | |
for handler in self._events.handlers(event): | |
if handler.once: | |
self._remove_handler(event, handler) | |
handler(*args, **kwargs) | |
return True | |
def _emit_until_complete(self, event: Hashable, *args: Any, **kwargs: Any) -> None: | |
self.emit(event, *args, **kwargs) | |
class AsyncIOEventEmitter(AbstractEventEmitter[Union[Listenable, AsyncListenable], AsyncHandler]): | |
"""An `AsyncIOEventEmitter` class that executes listeners asynchronously. | |
This class allows you to add and remove listeners for specified events and emit those events with arbitrary arguments in a non-blocking manner. | |
When a new listener is added, the `AsyncIOEventEmitter` emits a `"new_listener"` event. When an existing listener is removed, it emits a `"remove_listener"` event. | |
""" | |
_handler_cls = AsyncHandler | |
def __init__(self, *args: Any, **kwargs: Any) -> None: | |
"""Initializes an instance of `AsyncIOEventEmitter`. | |
Args: | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
""" | |
super().__init__(*args, **kwargs) | |
async def emit(self, event: Hashable, *args: Any, **kwargs: Any) -> bool: | |
"""Calls each of the listeners registered for the event named `event`, simultaneously, passing the supplied arguments to each. | |
Args: | |
event: The name of the event | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
Returns: | |
`True` if the `event` had listeners, `False` otherwise. | |
""" | |
if event not in self._events: | |
return False | |
tasks = set() | |
for handler in self._events.handlers(event): | |
if handler.once: | |
self._remove_handler(event, handler) | |
tasks.add(handler(*args, **kwargs)) | |
await asyncio.gather(*tasks) | |
return True | |
async def emit_in_order(self, event: Hashable, *args: Any, **kwargs: Any) -> bool: | |
"""Calls each of the listeners registered for the event named `event`, in the order they were registered, passing the supplied arguments to each. | |
Args: | |
event: The name of the event | |
*args: Arbitrary positional arguments | |
**kwargs: Arbitrary keyword arguments | |
Returns: | |
`True` if the `event` had listeners, `False` otherwise. | |
""" | |
if event not in self._events: | |
return False | |
for handler in self._events.handlers(event): | |
if handler.once: | |
self._remove_handler(event, handler) | |
await handler(*args, **kwargs) | |
return True | |
def _emit_until_complete(self, event: Hashable, *args: Any, **kwargs: Any) -> None: | |
run_coroutine(self.emit, event, *args, **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment