Last active
October 20, 2023 08:54
-
-
Save ahmetkotan/32d76ef18ed85c5d3cc4ac8875b5e2c1 to your computer and use it in GitHub Desktop.
Implement Celery to Multi-Tenant Architecture
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Standard Library | |
import logging | |
# Third Party | |
from celery import Celery | |
from celery.app.amqp import AMQP | |
from celery.app.task import Task | |
from django_tenants.utils import schema_context | |
# Django Stuff | |
from django.db import connection as db_connection | |
logger = logging.getLogger("celery") | |
class MyCustomTask(Task): | |
def __call__(self, *args, **kwargs): | |
headers = self.request.headers | |
if headers: | |
# Headers may not be available because amqp is not working in unit tests | |
schema_name = headers.get("schema_name", "public") | |
with schema_context(schema_name): | |
return super().__call__(*args, **kwargs) | |
return super().__call__(*args, **kwargs) | |
class MyCustomAMQP(AMQP): | |
def as_task_v2(self, *args, **kwargs): | |
task_message = super().as_task_v2(*args, **kwargs) | |
task_message.headers["schema_name"] = db_connection.schema_name | |
return task_message | |
app = Celery("proj", amqp=MyCustomAMQP, task_cls=MyCustomTask) | |
# Using a string here means the worker doesn't have to serialize | |
# the configuration object to child processes. | |
# - namespace='CELERY' means all celery-related configuration keys | |
# should have a `CELERY_` prefix. | |
app.config_from_object("django.conf:settings", namespace="CELERY") | |
# Load task modules from all registered Django app configs. | |
app.autodiscover_tasks() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment