Skip to content

Instantly share code, notes, and snippets.

@gone
gone / v1.py
Created September 3, 2024 13:26
import time
import orjson
from celery import shared_task
from django.db import transaction
from django.db.models import QuerySet
from typing import Dict, Type
from contextlib import contextmanager
import redis
from dataclasses import asdict
class SyncedModelMixin(models.Model):
last_sync_hashes = models.JSONField(default=dict)
def inside_task(instance):
data = self.get_update_data(instance)
rendered_data = self.render_data(data)
current_hash = hash(rendered_data)
syncer_name = self.__class__.__name__
if current_hash != instance.last_sync_hashes.get(syncer_name):
# Perform the actual update
class PausedSyncIterator:
def __init__(self, queryset: QuerySet):
self.queryset = queryset
self.iterator = iter(queryset)
def __iter__(self):
return self
def __next__(self):
try:
class HTTPBaseTask(Task):
def __init__(self):
super().__init__()
self.session = requests.Session()
self.session.headers.update({'Authorization': 'Bearer your_token_here'})
class AttributeSyncer:
model = None
class AttributeSyncerTask(Task):
def __init__(self):
super().__init__()
self.session = requests.Session()
# Configure session as needed, e.g.:
self.session.headers.update({'Authorization': 'Bearer your_token_here'})
def run(self, instance_id):
instance = self.get_instance(instance_id)
class AttributeSyncer:
@classmethod
def process_update(cls, instance):
if created:
new_data = cls.get_update_data(instance)
cls.send_update(instance, new_data)
else:
old_data = cls.get_update_data(instance, original=True)
new_data = cls.get_update_data(instance)
diff = jsondiff.diff(old_data, new_data)
class SyncedModelMixin:
last_synced_data = models.JSONField(null=True, blank=True)
class AttributeSyncer:
model = None
serializer_class = None
@classmethod
def process_update(cls, instance):
new_data = cls.get_update_data(instance)
class AttributeSyncer:
queryset = None # To be defined in subclasses
@classmethod
def should_sync(cls, instance):
return cls.queryset.filter(pk=instance.pk).exists()
class CustomerAttributeSyncer(AttributeSyncer):
queryset = Customer.objects.filter(is_active=True)
from dataclasses import dataclass, field
@dataclass
class Customer:
name: str
email: str
@dataclass
class Address:
street: str
@gone
gone / debounce.py
Last active September 2, 2024 18:30
class AttributeSyncer:
DEBOUNCE_DELAY = 60 # 60 seconds
LOCK_TIMEOUT = 300 # 5 minutes
@classmethod
def schedule_debounced_update(cls, instance):
model_name = instance.__class__.__name__
instance_id = instance.id
lock_key = f"{model_name}:{instance_id}:lock"