-
-
Save cjbottaro/8cfe8b3961030e2377ef62ade48bd3a5 to your computer and use it in GitHub Desktop.
Ecto shard.migrate
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Mix.Tasks.Shard.Migrate do | |
use Mix.Task | |
require Ecto.Query | |
import Mix.Shard | |
@shortdoc "Runs the client schemas migrations" | |
@moduledoc """ | |
Runs the pending migrations one or more client schemas. | |
## Examples | |
mix shard.migrate | |
mix shard.migrate -c 50 | |
mix shard.migrate -n 3 | |
mix shard.migrate --step 3 | |
mix shard.migrate --to 20080906120000 | |
mix shard.migrate -o dev -o alpha | |
## Command line options | |
* `--all` - run all pending migrations | |
* `--step`, `-n` - Run n number of pending migrations | |
* `--to` - Run all migrations up to and including version | |
* `--quiet` - Do not log migration commands | |
* `--log-sql` - Log the raw sql migrations are running | |
* `--concurrency`, `-c` - Set concurrency. Defaults to 20. | |
* `--only`, `-o` - Run for specified clients only. | |
""" | |
@switches [ | |
all: :boolean, | |
step: :integer, | |
to: :integer, | |
quiet: :boolean, | |
concurrency: :integer, | |
log_sql: :boolean, | |
only: [:string, :keep], | |
] | |
@aliases [ | |
n: :step, | |
c: :concurrency, | |
o: :only, | |
] | |
@migrations_path "priv/shard/migrations" | |
@impl true | |
def run(args \\ []) do | |
Logger.configure(level: :info) | |
{opts, _} = OptionParser.parse! args, strict: @switches, aliases: @aliases | |
opts = if opts[:to] || opts[:step] || opts[:all], | |
do: opts, | |
else: Keyword.put(opts, :all, true) | |
opts = Keyword.put_new(opts, :concurrency, 20) | |
opts = Keyword.put(opts, :pool_size, opts[:concurrency] * 2) # Each migration needs two connections. | |
run(:up, opts) | |
:ok | |
end | |
def run(dir, opts) do | |
# If we just pass a path ("priv/shard/migrations") to Ecto.Migrator.run, then we get | |
# code compilation errors because it's being run in Task.async_stream and multiple | |
# processes are trying to compile the files in that dir. Luckly, Ecto.Migrator.run | |
# lets us pass in [{version, module}] so we are free to load them up ourselves. | |
migrations = load_migrations() | |
Ecto.Migrator.with_repo(Datastores.Repo, fn _repo -> | |
clients = get_specified_clients(opts) | |
with_shard_repos([pool_size: opts[:pool_size]], fn -> | |
Enum.each(clients, fn client -> | |
migrate_shard(client, migrations, dir, opts) | |
end) | |
# Task.async_stream(clients, &migrate_shard(&1, migrations, dir, opts), concurrency: opts[:concurrency], timeout: :infinity) | |
# |> Enum.each(fn {:ok, _result} -> nil end) | |
end) | |
end) | |
end | |
def migrate_shard(client, migrations, dir, opts) do | |
repo = Datastores.Shard.csn_to_repo(client.csn) | |
prefix = Datastores.Shard.client_to_schema(client) | |
opts = Keyword.put(opts, :prefix, prefix) | |
Datastores.set(client.name) | |
Ecto.Migrator.run(repo, migrations, dir, opts) | |
end | |
def load_migrations() do | |
Path.wildcard("#{@migrations_path}/**/*.exs") | |
|> Enum.map(fn path -> | |
[{module, _code}] = Code.compile_file(path) | |
[version, _trash] = path |> Path.basename() |> String.split("_", parts: 2) | |
version = String.to_integer(version) | |
{version, module} | |
end) | |
|> Enum.sort() | |
end | |
# This is the equivalent to: | |
# Ecto.Migrator.with_repo(Datastores.Shard.Db01, fn _repo -> | |
# Ecto.Migrator.with_repo(Datastores.Shard.Db02, fn _repo -> | |
# Ecto.Migrator.with_repo(Datastores.Shard.Db03, fn _repo -> | |
# do_something() | |
# end) | |
# end) | |
# end) | |
def with_shard_repos(opts, f) do | |
Enum.reduce(Datastores.Shard.configured_repos(), f, fn repo, f -> | |
fn -> Ecto.Migrator.with_repo(repo, fn _repo -> f.() end, opts) end | |
end).() | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment