Skip to content

Instantly share code, notes, and snippets.

Created Dec 31, 2020
What would you like to do?
Ecto shard.migrate
defmodule Mix.Tasks.Shard.Migrate do
use Mix.Task
require Ecto.Query
import Mix.Shard
@shortdoc "Runs the client schemas migrations"
@moduledoc """
Runs the pending migrations one or more client schemas.
## Examples
mix shard.migrate
mix shard.migrate -c 50
mix shard.migrate -n 3
mix shard.migrate --step 3
mix shard.migrate --to 20080906120000
mix shard.migrate -o dev -o alpha
## Command line options
* `--all` - run all pending migrations
* `--step`, `-n` - Run n number of pending migrations
* `--to` - Run all migrations up to and including version
* `--quiet` - Do not log migration commands
* `--log-sql` - Log the raw sql migrations are running
* `--concurrency`, `-c` - Set concurrency. Defaults to 20.
* `--only`, `-o` - Run for specified clients only.
@switches [
all: :boolean,
step: :integer,
to: :integer,
quiet: :boolean,
concurrency: :integer,
log_sql: :boolean,
only: [:string, :keep],
@aliases [
n: :step,
c: :concurrency,
o: :only,
@migrations_path "priv/shard/migrations"
@impl true
def run(args \\ []) do
Logger.configure(level: :info)
{opts, _} = OptionParser.parse! args, strict: @switches, aliases: @aliases
opts = if opts[:to] || opts[:step] || opts[:all],
do: opts,
else: Keyword.put(opts, :all, true)
opts = Keyword.put_new(opts, :concurrency, 20)
opts = Keyword.put(opts, :pool_size, opts[:concurrency] * 2) # Each migration needs two connections.
run(:up, opts)
def run(dir, opts) do
# If we just pass a path ("priv/shard/migrations") to, then we get
# code compilation errors because it's being run in Task.async_stream and multiple
# processes are trying to compile the files in that dir. Luckly,
# lets us pass in [{version, module}] so we are free to load them up ourselves.
migrations = load_migrations()
Ecto.Migrator.with_repo(Datastores.Repo, fn _repo ->
clients = get_specified_clients(opts)
with_shard_repos([pool_size: opts[:pool_size]], fn ->
Enum.each(clients, fn client ->
migrate_shard(client, migrations, dir, opts)
# Task.async_stream(clients, &migrate_shard(&1, migrations, dir, opts), concurrency: opts[:concurrency], timeout: :infinity)
# |> Enum.each(fn {:ok, _result} -> nil end)
def migrate_shard(client, migrations, dir, opts) do
repo = Datastores.Shard.csn_to_repo(client.csn)
prefix = Datastores.Shard.client_to_schema(client)
opts = Keyword.put(opts, :prefix, prefix)
Datastores.set(, migrations, dir, opts)
def load_migrations() do
|> path ->
[{module, _code}] = Code.compile_file(path)
[version, _trash] = path |> Path.basename() |> String.split("_", parts: 2)
version = String.to_integer(version)
{version, module}
|> Enum.sort()
# This is the equivalent to:
# Ecto.Migrator.with_repo(Datastores.Shard.Db01, fn _repo ->
# Ecto.Migrator.with_repo(Datastores.Shard.Db02, fn _repo ->
# Ecto.Migrator.with_repo(Datastores.Shard.Db03, fn _repo ->
# do_something()
# end)
# end)
# end)
def with_shard_repos(opts, f) do
Enum.reduce(Datastores.Shard.configured_repos(), f, fn repo, f ->
fn -> Ecto.Migrator.with_repo(repo, fn _repo -> f.() end, opts) end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment