Skip to content

Instantly share code, notes, and snippets.

@hubertlepicki
Last active October 1, 2020 09:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hubertlepicki/8cc78b8167ef738eed9fc10b8ae1dcf3 to your computer and use it in GitHub Desktop.
Save hubertlepicki/8cc78b8167ef738eed9fc10b8ae1dcf3 to your computer and use it in GitHub Desktop.
defmodule UI.Application do
use Application
def start(_type, _args) do
# List of children to be started when checks succeed
delayed_children = [
UI.Endpoint,
{Absinthe.Subscription, UI.Endpoint}
]
# List all child processes to be supervised
children = [
{Infra.DelayedStartSupervisor, name: UI.DelayedStartSupervisor},
{Infra.DelayedStartWatchdog,
children: delayed_children,
for_name: UI.DelayedStartSupervisor,
checks: DB.ReadyChecks.all()}
]
opts = [strategy: :one_for_one, name: UI.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
UI.Endpoint.config_change(changed, removed)
:ok
end
end
defmodule Infra.DelayedStartSupervisor do
use DynamicSupervisor
require Logger
def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, :ignore, name: opts[:name])
end
def init(_ignore) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
defmodule Infra.DelayedStartWatchdog do
use GenServer
require Logger
@moduledoc """
Use with conjunction with `Infra.DelayedStartSupervisor`.
In your application callback module start named `Infra.DelayedStartSupervisor` and then
after it, start this module and pass it `for_name` matching the supervisor name,
list of checks (list of functions) and list of children to be started.
This watchdog will start polling every 1s to see if checks are passing, and will only start
children in supervisor once they do. At this moment the work of this GenServer is done and
it stops doing any more work.
delayed_children = [UI.Endpoint]
children = [
{Infra.DelayedStartSupervisor, name: UI.DelayedStartSupervisor},
{Infra.DelayedStartWatchdog,
children: delayed_children,
for_name: UI.DelayedStartSupervisor,
checks: DB.ReadyChecks.all()}
]
opts = [strategy: :one_for_one, name: UI.Supervisor]
Supervisor.start_link(children, opts)
"""
def start_link(opts) do
GenServer.start_link(
__MODULE__,
%{checks: opts[:checks], children: opts[:children], for_name: opts[:for_name]},
name: opts[:name]
)
end
@impl true
def init(state) do
schedule_poll(100)
{:ok, state}
end
@impl true
def handle_info(:check_ready, state) do
case Enum.all?(state.checks, & &1.()) do
true ->
Logger.info(
"#{__MODULE__} starting children for #{state.for_name} as all checks have passed"
)
state.children
|> Enum.each(fn child ->
{:ok, _pid} = DynamicSupervisor.start_child(state.for_name, child)
end)
{:noreply, state}
_ ->
Logger.warn(
"#{__MODULE__} delaying start of children of #{state.for_name} as some checks have failed"
)
schedule_poll()
{:noreply, state}
end
end
def handle_info(msg, state) do
Logger.warn("[#{__MODULE__} received unexpected message that was ignored: #{inspect(msg)}")
{:noreply, state}
end
# check every second for readiness
@poll_interval 1000
defp schedule_poll(delay \\ @poll_interval) do
Process.send_after(self(), :check_ready, delay)
end
end
defmodule DB.ReadyChecks do
require Logger
def all do
[
&check_primary/0,
&check_replica/0
]
end
defp check_primary do
concurrently(10, fn ->
try do
{:ok, _} = Ecto.Adapters.SQL.query(DB.Repo, "SELECT pg_sleep(0.1)")
true
rescue
e ->
Logger.error("#{__MODULE__}.check_primary check failed because of:")
Logger.error("#{inspect(e)}")
false
end
end)
end
defp check_replica do
concurrently(10, fn ->
try do
{:ok, _} = Ecto.Adapters.SQL.query(DB.Repo.replica(), "SELECT pg_sleep(0.1)")
true
rescue
e ->
Logger.error("#{__MODULE__}.check_replica check failed because of:")
Logger.error("#{inspect(e)}")
false
end
end)
end
defp concurrently(processes_count, func) do
0..processes_count
|> Enum.map(fn _ -> Task.async(fn -> func.() end) end)
|> Enum.map(&Task.await/1)
|> Enum.all?(& &1)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment