Last active
November 16, 2023 00:29
-
-
Save tomfa/4a389acfce393c2d7b54940e14613106 to your computer and use it in GitHub Desktop.
Django Elasticsearch with Celery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import shared_task | |
from django.db import models, transaction | |
from django.utils import timezone | |
from django_elasticsearch_dsl.apps import DEDConfig | |
from django_elasticsearch_dsl.registries import registry | |
from django_elasticsearch_dsl.signals import BaseSignalProcessor | |
from .. import utils | |
class CelerySignalProcessor(BaseSignalProcessor): | |
"""Celery processor | |
Allows automatic updates on the index as delayed background tasks | |
using Celery. | |
NB: Processing deletes is still handled syncronous. | |
By the time the Celery worker would pick up the delete job, the | |
model instance would already be deleted. | |
Use this Processor together with django_elasticsearch_dsl. | |
See https://github.com/sabricot/django-elasticsearch-dsl | |
Then use this worker by specifying in settings.py: | |
ELASTICSEARCH_DSL_SIGNAL_PROCESSOR = ( | |
'<your-project>.<your-app>.signals.CelerySignalProcessor' | |
) | |
""" | |
NO_INDEXED_AT_FIELD = -1 | |
def handle_save(self, sender, instance, **kwargs): | |
if not DEDConfig.autosync_enabled(): | |
return | |
if instance.__class__ not in registry.get_models(): | |
return | |
serialized_sender = utils.serialize_model(sender) | |
serialized_instance = utils.serialize_model_instance(instance) | |
return self.async_handle_save.apply_async( | |
( | |
serialized_sender, | |
serialized_instance, | |
getattr(instance, 'indexed_at', self.NO_INDEXED_AT_FIELD), | |
), | |
countdown=10, | |
) | |
@staticmethod | |
def perform_elastic_update(instance): | |
registry.update(instance) | |
registry.update_related(instance) | |
@shared_task(bind=True) | |
def async_handle_save(self, sender, instance, indexed_at): | |
deserialized_sender = utils.deserialize_model(sender) # noqa | |
if indexed_at == CelerySignalProcessor.NO_INDEXED_AT_FIELD: | |
instance = utils.deserialize_model_instance(instance) | |
CelerySignalProcessor.perform_elastic_update(instance) | |
return | |
with transaction.atomic(): | |
time_of_retrieval = timezone.now() | |
instance = utils.deserialize_model_instance( | |
instance, select_for_update=True | |
) | |
if utils.has_been_indexed_since(instance, timestamp=indexed_at): | |
return | |
CelerySignalProcessor.perform_elastic_update(instance) | |
utils.update_indexed_at(instance, timestamp=time_of_retrieval) | |
def setup(self): | |
# Listen to all model saves. | |
models.signals.post_save.connect(self.handle_save) | |
models.signals.post_delete.connect(self.handle_delete) | |
# Use to manage related objects update | |
models.signals.m2m_changed.connect(self.handle_m2m_changed) | |
models.signals.pre_delete.connect(self.handle_pre_delete) | |
def teardown(self): | |
# Listen to all model saves. | |
models.signals.post_save.disconnect(self.handle_save) | |
models.signals.post_delete.disconnect(self.handle_delete) | |
models.signals.m2m_changed.disconnect(self.handle_m2m_changed) | |
models.signals.pre_delete.disconnect(self.handle_pre_delete) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from typing import Dict, Optional | |
from django.db.models import Model | |
from django.db.models.base import ModelBase | |
from django.utils import dateparse | |
def serialize_model(model: Optional[ModelBase]) -> Optional[Dict[str, str]]: | |
""" | |
Accepts a django.db.models.Model class and returns a serialized dict of | |
it, so it may be sent to Celery workers over HTTP. | |
""" | |
if not isinstance(model, ModelBase): | |
return None | |
return {'app': model._meta.app_label, 'model': model._meta.model_name} | |
def deserialize_model( | |
serialized_model: Optional[Dict[str, str]] | |
) -> Optional[Model]: | |
""" | |
Accepts a serialized django.db.models.Model class and returns the class. | |
""" | |
from django.apps import apps | |
if not serialized_model: | |
return None | |
app = serialized_model.get('app') | |
model = serialized_model.get('model') | |
if not (app and model): | |
return None | |
return apps.get_model(app, model) | |
def serialize_model_instance( | |
instance: Optional[Model] | |
) -> Optional[Dict[str, str]]: | |
""" | |
Accept an django.db.models.Model instance and serializes it to a dictionary, | |
so it may be serialized and sent to Celery workers over HTTP | |
""" | |
if not instance: | |
return None | |
deserialized = serialize_model(instance.__class__) | |
deserialized['id'] = str(instance.id) | |
return deserialized | |
def deserialize_model_instance( | |
serialized_instance: Optional[Dict[str, str]], | |
*, | |
select_for_update: bool = False | |
): | |
""" | |
Accepts a serialized version of a Django model, and returns the instance. | |
set kwargs select_for_update=True to lock the table row, preventing other | |
threads from updating the instance while we work on it. | |
""" | |
model = deserialize_model(serialized_instance) | |
if not model: | |
return None | |
if not serialized_instance.get('id'): | |
return None | |
if select_for_update: | |
return ( | |
model.objects.filter(id=serialized_instance.get('id')) | |
.select_for_update() | |
.first() | |
) | |
return model.objects.filter(id=serialized_instance.get('id')).first() | |
def has_been_indexed_since(instance: Model, *, timestamp: Optional[datetime]): | |
""" | |
If instance has attribute indexed_at, we use this to determine if | |
the instance has been indexed between the time the task was called | |
was called, and now. | |
If it has been, reindexing is unnecessary. | |
""" | |
if not instance: | |
return False | |
if not getattr(instance, 'indexed_at', None): | |
return False | |
if not timestamp and not instance.indexed_at: | |
return False | |
if not timestamp and instance.indexed_at: | |
return True | |
if type(timestamp) == str: | |
timestamp = dateparse.parse_datetime(timestamp) | |
return instance.indexed_at > timestamp | |
def update_indexed_at(instance: Model, *, timestamp: Optional[datetime]): | |
""" | |
Updates attribute indexed_at (if it exists) without triggering | |
save_handler(). This prevents a loop from occuring | |
""" | |
if not hasattr(instance, 'indexed_at'): | |
return | |
if not hasattr(instance.__class__, 'objects'): | |
return | |
instance.__class__.objects.filter(pk=instance.pk).update( | |
indexed_at=timestamp | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment