Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
WeRiot Django Timescale integration
import logging
from django.contrib.gis.db.backends.postgis.base import \
DatabaseWrapper as PostgisDBWrapper
from django.db import ProgrammingError
from .schema import TimescaleSchemaEditor
logger = logging.getLogger(__name__)
class DatabaseWrapper(PostgisDBWrapper):
SchemaEditorClass = TimescaleSchemaEditor
def prepare_database(self):
"""Prepare the configured database.
This is where we enable the `timescaledb` extension
if it isn't enabled yet."""
super().prepare_database()
with self.cursor() as cursor:
try:
cursor.execute('CREATE EXTENSION IF NOT EXISTS timescaledb')
except ProgrammingError: # permission denied
logger.warning(
'Failed to create "timescaledb" extension. '
'Usage of timescale capabilities might fail'
'If timescale is needed, make sure you are connected '
'to the database as a superuser '
'or add the extension manually.',
exc_info=True
)
from django.contrib.gis.db.backends.postgis.schema import PostGISSchemaEditor
from timescale.fields import TimescaleDateTimeField
class TimescaleSchemaEditor(PostGISSchemaEditor):
sql_add_hypertable = (
"SELECT create_hypertable("
"{table}, {partition_column}, "
"chunk_time_interval => interval {interval})"
)
sql_drop_primary_key = (
'ALTER TABLE {table} '
'DROP CONSTRAINT {pkey}'
)
def drop_primary_key(self, model):
"""
Hypertables can't partition if the primary key is not
the partition column.
So we drop the mandatory primary key django creates.
"""
db_table = model._meta.db_table
table = self.quote_name(db_table)
pkey = self.quote_name(f'{db_table}_pkey')
sql = self.sql_drop_primary_key.format(table=table, pkey=pkey)
self.execute(sql)
def create_hypertable(self, model, field):
"""
Create the hypertable with the partition column being the field.
"""
partition_column = self.quote_value(field.column)
interval = self.quote_value(field.interval)
table = self.quote_value(model._meta.db_table)
sql = self.sql_add_hypertable.format(
table=table, partition_column=partition_column, interval=interval
)
self.execute(sql)
def create_model(self, model):
super().create_model(model)
for field in model._meta.local_fields:
if not isinstance(field, TimescaleDateTimeField):
continue
self.drop_primary_key(model)
self.create_hypertable(model, field)
from django.db import models
class TimeBucket(models.Func):
function = 'time_bucket'
def __init__(self, expression, interval):
if not isinstance(interval, models.Value):
interval = models.Value(interval)
super().__init__(interval, expression)
from django.db.models import DateTimeField
class TimescaleDateTimeField(DateTimeField):
def __init__(self, *args, interval, **kwargs):
self.interval = interval
super().__init__(*args, **kwargs)
def deconstruct(self):
name, path, args, kwargs = super().deconstruct()
kwargs['interval'] = self.interval
return name, path, args, kwargs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment