Skip to content

Instantly share code, notes, and snippets.

@rvause
Last active March 25, 2020 11:30
Show Gist options
  • Save rvause/e1de12430408f8193a0eb1d80ee9a0e6 to your computer and use it in GitHub Desktop.
Save rvause/e1de12430408f8193a0eb1d80ee9a0e6 to your computer and use it in GitHub Desktop.
# models.py
from .tasks import send_event
class Event(models.Model):
is_async = models.BooleanField(default=False)
# ...
def _send(self):
# do sending stuff
def send(self):
if self.is_async:
return send_event.delay(self.pk)
return self._send()
# tasks.py
from django.apps import apps
def send_event(pk):
event_model = apps.get_model("events.Event")
try:
event = event_model.objects.get(pk=pk)
event._send()
except event_model.DoesNotExist:
pass
# models.py
class Event(models.Model):
is_async = models.BooleanField(default=False)
# ...
def _send(self):
# do sending stuff
def send(self):
if self.is_async:
from .tasks import send_event
return send_event.delay(self.pk)
return self._send()
# tasks.py
from .models import Event
def send_event(pk):
try:
event = Event.objects.get(pk=pk)
event._send()
except event_model.DoesNotExist:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment