Created
November 15, 2019 01:53
-
-
Save isra17/3850567bb0adbab743ba9611a1bc47d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import partial, update_wrapper | |
from typing import Callable, Optional, Any, List, cast, Union, overload | |
from typing_extensions import Literal | |
import dramatiq | |
class LazyActor(object): | |
_actor_attributes = [ | |
"message", | |
"message_with_options", | |
"send", | |
"send_with_options", | |
"logger", | |
"fn", | |
"broker", | |
"actor_name", | |
"queue_name", | |
"priority", | |
"options", | |
] | |
tasks: List["LazyActor"] = [] | |
def __init__(self, fn: Callable, **kwargs: Any) -> None: | |
self._actor: Optional[dramatiq.Actor] = None | |
self.fn = fn | |
self.options = kwargs | |
self.tasks.append(self) | |
update_wrapper(self, self.fn) | |
def __call__(self, *args: Any, **kwargs: Any) -> Any: | |
return self.fn(*args, **kwargs) | |
def __getattr__(self, key: str) -> Any: | |
if key in self._actor_attributes: | |
return getattr(self.actor, key) | |
raise AttributeError() | |
@property | |
def actor(self) -> dramatiq.Actor: | |
self._init_actor() | |
return cast(dramatiq.Actor, self._actor) | |
@classmethod | |
def load_all(cls, broker: Optional[dramatiq.Broker] = None) -> None: | |
if broker: | |
dramatiq.set_broker(broker) | |
for task in cls.tasks: | |
task._init_actor() | |
def _init_actor(self) -> None: | |
if self._actor is None: | |
self._actor = dramatiq.actor(**self.options)(self.fn) | |
ActorFactory = Callable[[Callable], dramatiq.Actor] | |
@overload | |
def actor(func: Literal[None], **kwargs: Any) -> ActorFactory: | |
... | |
@overload | |
def actor(func: Callable, **kwargs: Any) -> dramatiq.Actor: | |
... | |
def actor( | |
func: Optional[Callable] = None, **kwargs: Any | |
) -> Union[ActorFactory, dramatiq.Actor]: | |
if func is None: | |
return cast(ActorFactory, partial(LazyActor, **kwargs)) | |
return cast(dramatiq.Actor, LazyActor(func, **kwargs)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment