Skip to content

Instantly share code, notes, and snippets.

@yucer-elbt
Created August 21, 2022 09:26
Show Gist options
  • Save yucer-elbt/da6d3c177a1f00787952dffda4ff5eb9 to your computer and use it in GitHub Desktop.
Save yucer-elbt/da6d3c177a1f00787952dffda4ff5eb9 to your computer and use it in GitHub Desktop.
Python Mixin for SQLAlchemy autopartition by week (postgres)
import logging
from sqlalchemy.sql import text
_logger = logging.getLogger(__name__)
class ArchivableMixin:
"""Model which can be duplicated via one special materialized view"""
_union_cols = []
_union_json_cols = [] # they need one special typecast
_partition_field = 'created_at'
@classmethod
def create_extra(cls, engine):
"""Create corresponding deduplication view"""
super().create_extra(engine)
src_table = cls.__tablename__
cls.__liveview_name__ = f"v_{src_table}_live"
part_field = cls._partition_field
view_def = cls.get_default_live_view_def(src_table, part_field)
engine.execute(text(view_def))
view_def = cls.get_create_automanage_fn_trg_def(src_table)
engine.execute(text(view_def))
view_def = cls.get_enable_automanage_trg_def(src_table)
engine.execute(text(view_def))
view_def = cls.get_date_index_def(src_table, part_field)
engine.execute(text(view_def))
@classmethod
def get_create_automanage_fn_trg_def(cls, src_table):
"""Get definition for insert trigger that locates the destination
table and creates it when needed"""
automanage_fn_trg_def = (f"""
CREATE OR REPLACE FUNCTION trg_{src_table}_partition()
RETURNS trigger AS
$func$
DECLARE
_tbl text := to_char(NEW.created_at, '"{src_table}_"IYYY_IW');
_min_date date := date_trunc('week', NEW.created_at)::date;
_max_date date := date_trunc('week', NEW.created_at)::date + 7;
_min_live date := date_trunc('week', NEW.created_at)::date - 1;
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relname = _tbl
AND c.relkind = 'r') THEN
EXECUTE format('CREATE TABLE IF NOT EXISTS %I
(CHECK (created_at::date >= to_date(%L, ''yyyy-mm-dd'') AND
created_at::date < to_date(%L, ''yyyy-mm-dd'')),
LIKE {src_table} INCLUDING INDEXES )
INHERITS ({src_table})'
, _tbl
, to_char(_min_date, 'YYYY-MM-DD')
, to_char(_max_date, 'YYYY-MM-DD')
);
IF (current_date >= _min_date) and (current_date < _max_date) THEN
EXECUTE format('CREATE OR REPLACE VIEW v_{src_table}_live AS (
SELECT * FROM {src_table}
WHERE (created_at::date >= to_date(%L, ''yyyy-mm-dd'') AND
created_at::date < to_date(%L, ''yyyy-mm-dd''))
)'
, to_char(_min_live, 'YYYY-MM-DD')
, to_char(_max_date, 'YYYY-MM-DD')
);
END IF;
END IF;
EXECUTE 'INSERT INTO ' || quote_ident(_tbl) || ' VALUES ($1.*)'
USING NEW;
RETURN NULL;
END
$func$ LANGUAGE plpgsql SET search_path = public;
""")
return automanage_fn_trg_def
@classmethod
def get_enable_automanage_trg_def(cls, src_table):
"""Get SQL for enable automanage trigger"""
union_view_def = (f"""
DROP TRIGGER IF EXISTS ins_{src_table} on {src_table};
CREATE TRIGGER ins_{src_table}
BEFORE INSERT ON {src_table}
FOR EACH ROW EXECUTE PROCEDURE trg_{src_table}_partition();
""")
return union_view_def
@classmethod
def get_date_index_def(cls, src_table, date_field):
"""Get SQL for date field index"""
date_index_def = (f"""
drop index if exists {src_table}_{date_field}_part_idx;
create unique index {src_table}_{date_field}_part_idx
ON {src_table}(({date_field}::date) DESC NULLS LAST, id);
""")
return date_index_def
@classmethod
def get_default_live_view_def(cls, src_table, date_field):
"""Get SQL for the view that points to the current partition.
At this point is the last two days. There is no planner optimization with
this query because current_date is dynamic"""
view_name = cls.__liveview_name__
live_def = (f"""
CREATE OR REPLACE VIEW {view_name} AS
(
SELECT * FROM {src_table}
WHERE ({date_field}::date >= current_date - 1)
);
""")
return live_def
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment