Skip to content

Instantly share code, notes, and snippets.

@h4
Last active March 13, 2024 07:22
Show Gist options
  • Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Save h4/fc9b6d350544ff66491308b535762fee to your computer and use it in GitHub Desktop.
Setup alembic to work properly with PostgreSQL schemas
from __future__ import with_statement
from alembic import context
from sqlalchemy import engine_from_config, pool
from logging.config import fileConfig
from models import Base
config = context.config
fileConfig(config.config_file_name)
"""
Load models metadata. We should define schema in this class firstly,
or set schema implicit with `__table_args__ = {'schema' : 'test'}` in model class
"""
target_metadata = Base.metadata
def run_migrations_offline():
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url, target_metadata=target_metadata, literal_binds=True)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix='sqlalchemy.',
poolclass=pool.NullPool)
with connectable.connect() as connection:
"""
Configure migration context
1. Pass our models metadata
2. Set schema for alembic_version table
3. Load all available schemas
"""
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table_schema=target_metadata.schema,
include_schemas=True
)
with context.begin_transaction():
"""
By default search_path is setted to "$user",public
that why alembic can't create foreign keys correctly
"""
context.execute('SET search_path TO public')
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
@dmugtasimov
Copy link

For myself I figured out this solution (after reviewing Alembic source code):

    database_schema = settings.database_schema


    def include_name_filter(name, type_, parent_names):
        if type_ == "schema":
            return name == database_schema

        return True

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            # The following 3 lines are required to support non-default
            # database schema for our database objects
            version_table_schema=database_schema,
            include_schemas=True,
            include_name=include_name_filter,
        )

        connection.execute(text(CREATE_SCHEMA_STATEMENT))

        with context.begin_transaction():
            context.run_migrations()

Setting search path did not work for me. It started finding alembic_version table, some foreign keys, etc.

@Luocy7
Copy link

Luocy7 commented Oct 18, 2023

here is my solution for work properly with PostgreSQL schema using asyncpg and asyncio in sqlalchemy.

what counts is like the official document and above said
configure filter object from specified schema

def include_name(name, type_, parent_names):
    if type_ == "schema":
        return name == target_metadata.schema
    else:
        return True

context.configure(
        connection=connection,
        target_metadata=target_metadata,

        version_table_schema=target_metadata.schema,
        include_schemas=True,
        include_name=include_name
    )

@Ilyespes2017
Copy link

Ilyespes2017 commented Feb 16, 2024

After spending a lot of time struggling to get my alembic to work, with a schema_name other than "public", I've come up with the following solution:
[The only solution that worked for me on February 16, 2024]

def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    In this scenario we need to create an Engine
    and associate a connection with the context.

    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(connection=connection, target_metadata=target_metadata)

        connection.execute(text('set search_path to "%s"' % settings.postgres_db_schema)) #  <-- The magic line

        with context.begin_transaction():
            context.run_migrations()

Hope this helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment