Skip to content

Instantly share code, notes, and snippets.

@eabruzzese
Last active April 1, 2020 18:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eabruzzese/00db30313309bfbff1b5a730982baa8e to your computer and use it in GitHub Desktop.
Save eabruzzese/00db30313309bfbff1b5a730982baa8e to your computer and use it in GitHub Desktop.
from services.service import BaseService, dispatch
from example.models import User, Notification
from email.services import send_email
def notify_user(user: User, message: str):
"""A function-based service for sending a notification to a user."""
notification = Notification.objects.create(
user=user,
message=message,
acked=False
)
dispatch(send_email, email=user.email, message=notification.message)
class NotifyUser(BaseService):
"""A class-based service for sending a notification to a user."""
# BaseService is a dataclass, so we can declare parameters like this
# instead of overriding a constructor.
user: User
message: str
def process(self):
notification = Notification.objects.create(
user=user,
message=message,
acked=False
)
# Services can dispatch other services.
dispatch(send_email, email=user.email, message=message)
import inspect
from dataclasses import dataclass
@dataclass
class BaseService:
"""A base for class-based services.
Parameters are given to the constructor and handled by the dataclass decorator:
https://docs.python.org/3/library/dataclasses.html
"""
def process(self):
raise NotImplementedError("Class-based services must implement #process()")
def dispatch(service, **kwargs):
"""Dispatches a service with the given kwargs."""
# Measure all of the service calls.
with metrics_wrapper():
# Call function-based services directly.
if inspect.isfunction(service):
return service(**kwargs)
# Instantiate class-based services and call process().
elif inspect.isclass(service) and issubclass(service, BaseService):
return service(**kwargs).process()
# Call the process() method on service instances.
elif isinstance(service, BaseService):
return service.process()
# Raise an error for everything else.
else:
raise ValueError(
"Services must be functions, classes that inherit from BaseService, "
"or instances of BaseService"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment