Skip to content

Instantly share code, notes, and snippets.

@Andrei-Pozolotin
Created February 18, 2020 18:16
Show Gist options
  • Save Andrei-Pozolotin/7bb44f13eeb26512d3c8196b14101f9d to your computer and use it in GitHub Desktop.
Save Andrei-Pozolotin/7bb44f13eeb26512d3c8196b14101f9d to your computer and use it in GitHub Desktop.
"""
"""
import tester
import logging
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy import engine_from_config
from alembic import context
logger = logging.getLogger(__name__)
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
from tester.dbms.alembic import AlembicUnit
target_metadata = AlembicUnit.target_metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline():
"""
Run migrations in 'offline' mode.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""
Run migrations in 'online' mode.
"""
def process_revision_directives(context, revision, directives):
if config.cmd_opts.autogenerate:
script = directives[0]
if script.upgrade_ops.is_empty():
directives[:] = []
engine = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with engine.connect() as connection:
AlembicUnit.attach_intercept(connection)
context.configure(
compare_type=True,
include_schemas=True,
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives,
)
with context.begin_transaction():
context.execute('SET search_path TO public')
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
"""
schema evolution support
"""
from contextvars import ContextVar
import sqlalchemy as sa
from alembic.ddl import base as ale_ddl
from sqlalchemy import engine
from sqlalchemy import event
from sqlalchemy.sql import ddl as sa_ddl
from tester.dbms.filer.base import ClusterFilerDBMS
from tester.dbms.model import ModelDBMS
from tester.dbms.tracker.base import ClusterTrackerDBMS
alembic_table_name = ContextVar("alembic_table_name", default=None)
alembic_visit_name = ContextVar("alembic_visit_name", default=None)
class AlembicUnit:
"schema evolution support"
target_metadata = [
ModelDBMS.metadata,
ClusterFilerDBMS.metadata,
ClusterTrackerDBMS.metadata,
]
@classmethod
def report_mapping(cls, source:str, target:str):
"render generated statement changes"
print(f"==============")
print(f"--- source ---")
print(source)
print(f"--- target ---")
print(target)
print(f"--------------")
@classmethod
def replicate_statement(cls, source:str) -> str:
"replicate DDL statement thoughout cluster"
table_name = alembic_table_name.get()
if table_name:
visit_name = alembic_visit_name.get()
set_name = "default"
if visit_name == "create_table":
target = f"SELECT pglogical_replicate_create('{set_name}','{table_name}',$${source}$$)"
elif visit_name == "drop_table":
target = f"SELECT pglogical_replicate_delete('{set_name}','{table_name}',$${source}$$)"
else:
target = f"SELECT pglogical_replicate_modify('{set_name}','{table_name}',$${source}$$)"
cls.report_mapping(source, target)
else:
target = source
return target
@classmethod
def attach_intercept(cls, connection:engine.Connection):
"setup ddl statement cluster replication"
@event.listens_for(connection, "before_execute", retval=True)
def intercept_clause(conn, clause, multi_param, param):
"extract table name and command type"
if isinstance(clause, sa_ddl.DDLElement):
table_name = None
visit_name = clause.__visit_name__
if "table" in visit_name:
table_name = clause.element.name
elif "index" in visit_name:
table_name = clause.element.table.name
elif isinstance(clause, ale_ddl.AlterTable):
table_name = clause.table_name
else:
assert False, f"no type: visit_name={visit_name} clause={clause} clause_type={type(clause)}"
assert isinstance(table_name, str), f"no text: {table_name} {type(table_name)}"
alembic_table_name.set(table_name)
alembic_visit_name.set(visit_name)
else:
alembic_table_name.set(None)
alembic_visit_name.set(None)
return clause, multi_param, param
@event.listens_for(connection, "before_cursor_execute", retval=True)
def intercept_statement(conn, cursor, statement, parameters, context, executemany):
"apply cluster replication command"
statement = cls.replicate_statement(statement)
return statement, parameters
---
CREATE OR REPLACE FUNCTION pglogical_table_register(
set_name text, relation text
)
RETURNS text LANGUAGE plpgsql AS $FUN$
BEGIN
IF relation LIKE 'cluster_%' THEN
RETURN format($$
SELECT pglogical.replication_set_add_table(
set_name := '%s', relation := '%s'
);
$$, set_name, relation);
ELSE
RETURN '';
END IF;
END;
$FUN$;
---
CREATE OR REPLACE FUNCTION pglogical_table_unregister(
set_name text, relation text
)
RETURNS text LANGUAGE plpgsql AS $FUN$
BEGIN
IF relation LIKE 'cluster_%' THEN
RETURN format($$
SELECT pglogical.replication_set_remove_table(
set_name := '%s', relation := '%s'
);
$$, set_name, relation);
ELSE
RETURN '';
END IF;
END;
$FUN$;
--
-- invoke sql on this none
--
CREATE OR REPLACE FUNCTION pglogical_replicate_invoke(
command text
)
RETURNS void LANGUAGE plpgsql AS $FUN$
DECLARE
cluster_ident_min text := current_setting('cluster.ident_min');
cluster_ident_max text := current_setting('cluster.ident_max');
BEGIN
command := replace(command, '{cluster.ident_min}', cluster_ident_min);
command := replace(command, '{cluster.ident_max}', cluster_ident_max);
command := format($$
SET search_path TO public;
%s;
$$, command);
EXECUTE command;
END;
$FUN$;
--
-- replicate invoke command
--
CREATE OR REPLACE FUNCTION pglogical_replicate_modify(
set_name text, relation text, command text
)
RETURNS void LANGUAGE plpgsql AS $FUN$
DECLARE
command_modify text := format($SQL$
SET search_path TO public;
SELECT pglogical_replicate_invoke($$ %s $$);
$SQL$, command);
BEGIN
PERFORM pglogical.replicate_ddl_command(
command := command_modify, replication_sets := ARRAY[set_name]
);
END;
$FUN$;
---
CREATE OR REPLACE FUNCTION pglogical_replicate_create(
set_name text, relation text, command text
)
RETURNS void LANGUAGE plpgsql AS $FUN$
DECLARE
command_create text := format($$
%s; %s;
$$, command, pglogical_table_register(set_name, relation));
BEGIN
PERFORM pglogical_replicate_modify(set_name, relation, command_create);
END;
$FUN$;
---
CREATE OR REPLACE FUNCTION pglogical_replicate_delete(
set_name text, relation text, command text
)
RETURNS void LANGUAGE plpgsql AS $FUN$
DECLARE
command_delete text := format($$
%s; %s;
$$, pglogical_table_unregister(set_name, relation), command);
BEGIN
PERFORM pglogical_replicate_modify(set_name, relation, command_delete);
END;
$FUN$;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment