This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import orjson | |
from celery import shared_task | |
from django.db import transaction | |
from django.db.models import QuerySet | |
from typing import Dict, Type | |
from contextlib import contextmanager | |
import redis | |
from dataclasses import asdict |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SyncedModelMixin(models.Model): | |
last_sync_hashes = models.JSONField(default=dict) | |
def inside_task(instance): | |
data = self.get_update_data(instance) | |
rendered_data = self.render_data(data) | |
current_hash = hash(rendered_data) | |
syncer_name = self.__class__.__name__ | |
if current_hash != instance.last_sync_hashes.get(syncer_name): | |
# Perform the actual update |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PausedSyncIterator: | |
def __init__(self, queryset: QuerySet): | |
self.queryset = queryset | |
self.iterator = iter(queryset) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
try: |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class HTTPBaseTask(Task): | |
def __init__(self): | |
super().__init__() | |
self.session = requests.Session() | |
self.session.headers.update({'Authorization': 'Bearer your_token_here'}) | |
class AttributeSyncer: | |
model = None |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AttributeSyncerTask(Task): | |
def __init__(self): | |
super().__init__() | |
self.session = requests.Session() | |
# Configure session as needed, e.g.: | |
self.session.headers.update({'Authorization': 'Bearer your_token_here'}) | |
def run(self, instance_id): | |
instance = self.get_instance(instance_id) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AttributeSyncer: | |
@classmethod | |
def process_update(cls, instance): | |
if created: | |
new_data = cls.get_update_data(instance) | |
cls.send_update(instance, new_data) | |
else: | |
old_data = cls.get_update_data(instance, original=True) | |
new_data = cls.get_update_data(instance) | |
diff = jsondiff.diff(old_data, new_data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SyncedModelMixin: | |
last_synced_data = models.JSONField(null=True, blank=True) | |
class AttributeSyncer: | |
model = None | |
serializer_class = None | |
@classmethod | |
def process_update(cls, instance): | |
new_data = cls.get_update_data(instance) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AttributeSyncer: | |
queryset = None # To be defined in subclasses | |
@classmethod | |
def should_sync(cls, instance): | |
return cls.queryset.filter(pk=instance.pk).exists() | |
class CustomerAttributeSyncer(AttributeSyncer): | |
queryset = Customer.objects.filter(is_active=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass, field | |
@dataclass | |
class Customer: | |
name: str | |
email: str | |
@dataclass | |
class Address: | |
street: str |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AttributeSyncer: | |
DEBOUNCE_DELAY = 60 # 60 seconds | |
LOCK_TIMEOUT = 300 # 5 minutes | |
@classmethod | |
def schedule_debounced_update(cls, instance): | |
model_name = instance.__class__.__name__ | |
instance_id = instance.id | |
lock_key = f"{model_name}:{instance_id}:lock" |
NewerOlder