Skip to content

Instantly share code, notes, and snippets.

@jsmitka
Last active December 22, 2023 13:22
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsmitka/6e2326d5f0443f3a060a6f73eae6f34e to your computer and use it in GitHub Desktop.
Save jsmitka/6e2326d5f0443f3a060a6f73eae6f34e to your computer and use it in GitHub Desktop.
Actor interfaces for Dramatiq
from abc import ABCMeta
from dramatiq import actor, Message, get_broker
def implementation_do_call(self, *args, **kwargs):
return self.perform(*args, **kwargs)
class BaseActorInterfaceMeta(ABCMeta):
DEFAULT_QUEUE_NAME = 'default'
def __new__(mcs, name, bases, namespace, **kwargs):
cls = super().__new__(mcs, name, bases, namespace)
# If the class has the perform() method, it is either an Actor Interface or an Actor Implementation.
if hasattr(cls, 'perform'):
perform_method = getattr(cls, 'perform')
# Actor Interface has an abstract method:
if hasattr(perform_method, '__isabstractmethod__') and perform_method.__isabstractmethod__ is True:
# Set the name of the actor and options passed as kwargs as
cls.actor_name = kwargs.pop('actor_name', cls.__name__)
cls.queue_name = kwargs.get('queue_name', mcs.DEFAULT_QUEUE_NAME)
cls.actor_options = kwargs
else:
cls_instance = cls()
actor_instance = actor(cls_instance, actor_name=cls.actor_name, **cls.actor_options)
setattr(cls, '__call__', implementation_do_call)
setattr(cls, '__actor__', actor_instance)
return cls_instance
return cls
class BaseActorInterface(metaclass=BaseActorInterfaceMeta):
@classmethod
def send(cls, *args, **kwargs):
message = cls.create_message(args, kwargs)
get_broker().enqueue(message)
@classmethod
def create_message(cls, args, kwargs, options=None):
return Message(
queue_name=cls.queue_name,
actor_name=cls.actor_name,
args=args,
kwargs=kwargs,
options=options if options is not None else {}
)
from shared_library import TaskOneInterface, TaskTwoInterface
# Call TaskOne:
TaskOneInterface.send(10, 5)
# Call TaskTwo:
TaskTwoInterface.send('hello!')
from abc import abstractmethod
from base_actor_interface import BaseActorInterface
class TaskOneInterface(BaseActorInterface, queue_name='test'):
@abstractmethod
def perform(self, a: int, b: int) -> int:
raise NotImplementedError('The interface cannot be called directly!')
class TaskTwoInterface(BaseActorInterface):
@abstractmethod
def perform(self, val):
raise NotImplementedError('The interface cannot be called directly!')
from shared_library import TaskOneInterface
class TaskOne(TaskOneInterface):
def perform(self, a: int, b: int):
print(f'{a} + {b} = {a + b}')
from shared_library import TaskTwoInterface
class TaskTwo(TaskTwoInterface):
def perform(self, val):
print(f'Task 2 value: {val}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment