Skip to content

Instantly share code, notes, and snippets.

@tomusher
Last active October 16, 2021 08:57
Show Gist options
  • Save tomusher/dad2c3976269939517f2e546a64b6aa6 to your computer and use it in GitHub Desktop.
Save tomusher/dad2c3976269939517f2e546a64b6aa6 to your computer and use it in GitHub Desktop.
Misc thoughts about deferred task API
class BaseTaskWrapper:
# Like a Future, Promise, Job, whatever...
status: TaskStatusEnum
@property
def response:
pass
# ----
# 'Default' Task Backend Example
# ----
class TaskWrapper(BaseTaskWrapper):
@property
def response:
return self._response
class ImmediateTaskBackend(BaseDeferredTaskBackend):
def defer(self, func, *args, **kwargs):
# Just call the function
ret = func(*args, **kwargs)
return TaskWrapper(_response=ret, status=TaskStatusEnum.COMPLETED)
# ----
# Contrib App Task Backend Example
# ----
from wagtail.contrib.cron_deferred.models import DeferredTask
class TaskWrapper(BaseTaskWrapper):
task_id: int
@property
def response:
return DeferredTask.objects.get(pk=task_id).response
class CronDeferredTaskBackend(BaseDeferredTaskBackend):
def defer(self, func, *args, **kwargs):
serialized_task = do_some_serialization(func, *args, **kwargs)
task = DeferredTask.create(spec=serialized_task)
return TaskWrapper(task_id=DeferredTask.pk)
...
# management command processes DeferredTasks on a cron
# ----
# Third Party/non-core RQ Backend Example
# ----
class TaskWrapper(BaseTaskWrapper):
job: rq.Job
@property
def response:
return self.job.response
class RQDeferredTaskBackend(BaseDeferredTaskBackend):
def defer(self, func, *args, **kwargs):
...
job = q.enqueue(func, *args, **kwargs)
return TaskWrapper(job=job)
# ----
# Example usage in core
# ----
from wagtail.core.deferred import defer
def send_notification(recipient_users, notification, extra_context):
defer(_send_notification, recipient_users, notification, extra_context)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment