Skip to content

Instantly share code, notes, and snippets.

@tomfa
Last active November 16, 2023 00:29
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomfa/4a389acfce393c2d7b54940e14613106 to your computer and use it in GitHub Desktop.
Save tomfa/4a389acfce393c2d7b54940e14613106 to your computer and use it in GitHub Desktop.
Django Elasticsearch with Celery
from celery import shared_task
from django.db import models, transaction
from django.utils import timezone
from django_elasticsearch_dsl.apps import DEDConfig
from django_elasticsearch_dsl.registries import registry
from django_elasticsearch_dsl.signals import BaseSignalProcessor
from .. import utils
class CelerySignalProcessor(BaseSignalProcessor):
"""Celery processor
Allows automatic updates on the index as delayed background tasks
using Celery.
NB: Processing deletes is still handled syncronous.
By the time the Celery worker would pick up the delete job, the
model instance would already be deleted.
Use this Processor together with django_elasticsearch_dsl.
See https://github.com/sabricot/django-elasticsearch-dsl
Then use this worker by specifying in settings.py:
ELASTICSEARCH_DSL_SIGNAL_PROCESSOR = (
'<your-project>.<your-app>.signals.CelerySignalProcessor'
)
"""
NO_INDEXED_AT_FIELD = -1
def handle_save(self, sender, instance, **kwargs):
if not DEDConfig.autosync_enabled():
return
if instance.__class__ not in registry.get_models():
return
serialized_sender = utils.serialize_model(sender)
serialized_instance = utils.serialize_model_instance(instance)
return self.async_handle_save.apply_async(
(
serialized_sender,
serialized_instance,
getattr(instance, 'indexed_at', self.NO_INDEXED_AT_FIELD),
),
countdown=10,
)
@staticmethod
def perform_elastic_update(instance):
registry.update(instance)
registry.update_related(instance)
@shared_task(bind=True)
def async_handle_save(self, sender, instance, indexed_at):
deserialized_sender = utils.deserialize_model(sender) # noqa
if indexed_at == CelerySignalProcessor.NO_INDEXED_AT_FIELD:
instance = utils.deserialize_model_instance(instance)
CelerySignalProcessor.perform_elastic_update(instance)
return
with transaction.atomic():
time_of_retrieval = timezone.now()
instance = utils.deserialize_model_instance(
instance, select_for_update=True
)
if utils.has_been_indexed_since(instance, timestamp=indexed_at):
return
CelerySignalProcessor.perform_elastic_update(instance)
utils.update_indexed_at(instance, timestamp=time_of_retrieval)
def setup(self):
# Listen to all model saves.
models.signals.post_save.connect(self.handle_save)
models.signals.post_delete.connect(self.handle_delete)
# Use to manage related objects update
models.signals.m2m_changed.connect(self.handle_m2m_changed)
models.signals.pre_delete.connect(self.handle_pre_delete)
def teardown(self):
# Listen to all model saves.
models.signals.post_save.disconnect(self.handle_save)
models.signals.post_delete.disconnect(self.handle_delete)
models.signals.m2m_changed.disconnect(self.handle_m2m_changed)
models.signals.pre_delete.disconnect(self.handle_pre_delete)
from datetime import datetime
from typing import Dict, Optional
from django.db.models import Model
from django.db.models.base import ModelBase
from django.utils import dateparse
def serialize_model(model: Optional[ModelBase]) -> Optional[Dict[str, str]]:
"""
Accepts a django.db.models.Model class and returns a serialized dict of
it, so it may be sent to Celery workers over HTTP.
"""
if not isinstance(model, ModelBase):
return None
return {'app': model._meta.app_label, 'model': model._meta.model_name}
def deserialize_model(
serialized_model: Optional[Dict[str, str]]
) -> Optional[Model]:
"""
Accepts a serialized django.db.models.Model class and returns the class.
"""
from django.apps import apps
if not serialized_model:
return None
app = serialized_model.get('app')
model = serialized_model.get('model')
if not (app and model):
return None
return apps.get_model(app, model)
def serialize_model_instance(
instance: Optional[Model]
) -> Optional[Dict[str, str]]:
"""
Accept an django.db.models.Model instance and serializes it to a dictionary,
so it may be serialized and sent to Celery workers over HTTP
"""
if not instance:
return None
deserialized = serialize_model(instance.__class__)
deserialized['id'] = str(instance.id)
return deserialized
def deserialize_model_instance(
serialized_instance: Optional[Dict[str, str]],
*,
select_for_update: bool = False
):
"""
Accepts a serialized version of a Django model, and returns the instance.
set kwargs select_for_update=True to lock the table row, preventing other
threads from updating the instance while we work on it.
"""
model = deserialize_model(serialized_instance)
if not model:
return None
if not serialized_instance.get('id'):
return None
if select_for_update:
return (
model.objects.filter(id=serialized_instance.get('id'))
.select_for_update()
.first()
)
return model.objects.filter(id=serialized_instance.get('id')).first()
def has_been_indexed_since(instance: Model, *, timestamp: Optional[datetime]):
"""
If instance has attribute indexed_at, we use this to determine if
the instance has been indexed between the time the task was called
was called, and now.
If it has been, reindexing is unnecessary.
"""
if not instance:
return False
if not getattr(instance, 'indexed_at', None):
return False
if not timestamp and not instance.indexed_at:
return False
if not timestamp and instance.indexed_at:
return True
if type(timestamp) == str:
timestamp = dateparse.parse_datetime(timestamp)
return instance.indexed_at > timestamp
def update_indexed_at(instance: Model, *, timestamp: Optional[datetime]):
"""
Updates attribute indexed_at (if it exists) without triggering
save_handler(). This prevents a loop from occuring
"""
if not hasattr(instance, 'indexed_at'):
return
if not hasattr(instance.__class__, 'objects'):
return
instance.__class__.objects.filter(pk=instance.pk).update(
indexed_at=timestamp
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment