Skip to content

Instantly share code, notes, and snippets.

@ahmetkotan
Last active October 20, 2023 08:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahmetkotan/32d76ef18ed85c5d3cc4ac8875b5e2c1 to your computer and use it in GitHub Desktop.
Save ahmetkotan/32d76ef18ed85c5d3cc4ac8875b5e2c1 to your computer and use it in GitHub Desktop.
Implement Celery to Multi-Tenant Architecture
# Standard Library
import logging
# Third Party
from celery import Celery
from celery.app.amqp import AMQP
from celery.app.task import Task
from django_tenants.utils import schema_context
# Django Stuff
from django.db import connection as db_connection
logger = logging.getLogger("celery")
class MyCustomTask(Task):
def __call__(self, *args, **kwargs):
headers = self.request.headers
if headers:
# Headers may not be available because amqp is not working in unit tests
schema_name = headers.get("schema_name", "public")
with schema_context(schema_name):
return super().__call__(*args, **kwargs)
return super().__call__(*args, **kwargs)
class MyCustomAMQP(AMQP):
def as_task_v2(self, *args, **kwargs):
task_message = super().as_task_v2(*args, **kwargs)
task_message.headers["schema_name"] = db_connection.schema_name
return task_message
app = Celery("proj", amqp=MyCustomAMQP, task_cls=MyCustomTask)
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object("django.conf:settings", namespace="CELERY")
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment