Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Elixir runtime-controlled supervision tree using feature flags (circuit breaker)

These snippets provide a foundation for starting and stopping supervision trees at runtime using feature flags (e.g. Launch Darkly).

Some things to note when adapting these snippets:

  1. application.ex needs to be adapted into an existing application. The important part is that each child spec provided is compliant, and that there is a feature flag (ld_key) specified.
  2. As written, if a feature flag fails for some reason, it defaults to starting all children. There is room for adaptation here as needed.
  3. This implementation will still require a FeatureFlags module to be available that implements is_on?/2. Adjust as needed to accomodate your own feature flag setup.
# How we start the Circuit Breaker, including the list of child specs
# and their corresponding feature flags to watch. This needs to be adapted into an existing application.ex file.
defmodule MyApp.Application do
@moduledoc false
use Application
def start(_type, _args) do
children = [
# ...
{
CircuitBreaker,
children: [
%{
id: SomeService.Supervisor,
start: {SomeService.Supervisor, :start_link, [[]]},
restart: :temporary,
ld_key: "some-feature-flag-on-launch-darkly"
},
%{
id: AnotherService.Supervisor,
start: {AnotherService.Supervisor, :start_link, [[]]},
restart: :temporary,
ld_key: "a-different-feature-flag"
}
]
}
# ...
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule CircuitBreaker do
@moduledoc """
Supervisor for starting/stopping/restarting supervisors using feature flags.
If a child completly shuts down, it remains shut down until the entire
application restarts, or the Launch Darkly flag is cycled.
"""
use Supervisor
def start_link(init_arg) do
Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(init_arg) do
monitored_children = Keyword.get(init_arg, :children, [])
children = [
{DynamicSupervisor, name: __MODULE__.DynamicSupervisor, strategy: :one_for_one},
{__MODULE__.Monitor, children: monitored_children}
]
Supervisor.init(children, strategy: :one_for_one)
end
def add_child(spec) do
GenServer.call(__MODULE__.Monitor, {:add_child, spec})
end
defdelegate pid_for_spec(spec), to: __MODULE__.Monitor
defdelegate spec_to_id(spec), to: __MODULE__.Monitor
end
defmodule CircuitBreaker.Monitor do
@moduledoc """
Handles monitoring and starting/stopping processes supervised
under our circuit breaker.
"""
use GenServer
require Logger
alias FeatureFlags
defstruct children: [], ref_to_id: %{}, id_to_ref: %{}, crashes: %{}
@dynamic_supervisor CircuitBreaker.DynamicSupervisor
@poll_interval 1_000
def start_link(init_arg) do
GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(init_arg) do
children = Keyword.fetch!(init_arg, :children)
{:ok, %__MODULE__{children: children}, {:continue, :check_children}}
end
@impl true
def handle_continue(:check_children, state) do
handle_info(:check_children, state)
end
@impl true
def handle_info(:check_children, %__MODULE__{children: children} = state) do
schedule_next_loop(@poll_interval)
state = Enum.reduce(children, state, &check_spec_state/2)
{:noreply, state}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, reason}, %__MODULE__{ref_to_id: ref_to_id} = state) do
id = Map.get(ref_to_id, ref)
state =
state
|> clear_monitor(id)
|> clear_crash(id)
state =
case reason do
:normal ->
state
crash ->
Logger.error(
"Process id #{id} backed by circuit breaker has crashed: #{inspect(crash)}",
context: __MODULE__
)
put_crash(state, id)
end
{:noreply, state}
end
defp schedule_next_loop(interval) do
if @enabled? do
Process.send_after(self(), :check_children, interval)
else
:ok
end
end
defp check_spec_state(%{ld_key: ld_key} = spec, state) do
flag_on? = FeatureFlags.is_on?(ld_key, default: true)
has_crashed? = has_crashed?(state, spec)
is_running? = is_running?(spec)
handle_states(flag_on?, has_crashed?, is_running?, state, spec)
end
defp handle_states(flag_on?, has_crashed?, is_running?, state, spec)
# If it's supposed to be on, hasn't crashed, and isn't running -> start it
defp handle_states(true, false, false, state, spec) do
Logger.info("Circuit breaker flag activated, starting up child with spec #{inspect(spec)}",
context: __MODULE__
)
start_child(state, spec)
end
# If it's supposed to be on, _has_ crashed, but is still runnning -> clear the crash and re-monitor it
# Reasoning: This should never happen, but if it does, tolerate it.
defp handle_states(true, true, true, state, spec) do
pid = pid_for_spec(spec)
if is_nil(pid) do
clear_crash(state, spec)
else
clear_crash(state, spec) |> monitor_pid(spec, pid)
end
end
# If it's supposed to be stopped but hasn't yet, stop it.
# Ignore whether or not it crashed (as we should clear crash state in stop_child)
defp handle_states(false, _, true, state, spec) do
stop_child(state, spec)
end
# If it's supposed to be stopped (and is stopped), but it's crashed, clear the crash state
# This state shouldn't be reachable, but if it is, this is the right move.
# No need to log this state as it doesn't mean anything at runtime
defp handle_states(false, true, false, state, spec), do: clear_crash(state, spec)
# Every other state should be a "steady state" with zero change.
defp handle_states(_, _, _, state, _), do: state
defp has_crashed?(%__MODULE__{crashes: crashes}, spec) do
id = spec_to_id(spec)
Map.get(crashes, id) != nil
end
defp is_running?(spec) do
case pid_for_spec(spec) do
pid when is_pid(pid) -> Process.alive?(pid)
_ -> false
end
end
defp start_child(state, spec) do
state =
state
|> clear_crash(spec)
|> clear_monitor(spec)
case DynamicSupervisor.start_child(@dynamic_supervisor, spec) do
{:ok, pid} ->
monitor_pid(state, spec, pid)
{:ok, pid, _info} ->
monitor_pid(state, spec, pid)
{:error, {:already_started, pid}} ->
monitor_pid(state, spec, pid)
# Any other failure and we don't start the child
{:error, reason} ->
Logger.warn(
"Unable to start a child under the circuit breaker: #{inspect(reason)} with spec: #{inspect(spec)}",
context: __MODULE__
)
state
:ignore ->
Logger.warn(
"Unable to start a child under the circuit breaker: child process start function returned :ignore with spec: #{inspect(spec)}",
context: __MODULE__
)
state
end
end
defp stop_child(state, spec) do
case pid_for_spec(spec) do
pid when is_pid(pid) ->
DynamicSupervisor.terminate_child(@dynamic_supervisor, pid)
state
|> clear_crash(spec)
|> clear_monitor(spec)
nil ->
state
end
end
defp monitor_pid(%__MODULE__{ref_to_id: ref_to_id, id_to_ref: id_to_ref} = state, spec, pid) do
id = spec_to_id(spec)
ref = Process.monitor(pid)
ref_to_id = Map.put(ref_to_id, ref, id)
id_to_ref = Map.put(id_to_ref, id, ref)
%__MODULE__{state | ref_to_id: ref_to_id, id_to_ref: id_to_ref}
end
defp put_crash(%__MODULE__{crashes: crashes} = state, spec) do
id = spec_to_id(spec)
crashes = Map.put(crashes, id, true)
clear_monitor(%__MODULE__{state | crashes: crashes}, spec)
end
defp clear_crash(%__MODULE__{crashes: crashes} = state, spec) do
id = spec_to_id(spec)
crashes = Map.drop(crashes, [id])
%{state | crashes: crashes}
end
defp clear_monitor(%__MODULE__{ref_to_id: ref_to_id, id_to_ref: id_to_ref} = state, spec) do
id = spec_to_id(spec)
ref_to_id =
case Map.get(id_to_ref, id) do
nil ->
ref_to_id
ref ->
Process.demonitor(ref)
Map.drop(ref_to_id, [ref])
end
id_to_ref = Map.drop(id_to_ref, [id])
%__MODULE__{state | id_to_ref: id_to_ref, ref_to_id: ref_to_id}
end
defp pid_for_spec(spec) do
maybe_child =
DynamicSupervisor.which_children(@dynamic_supervisor)
# Always return the last created process, helpful for race conditions
|> Enum.reverse()
|> Enum.find(fn
{_, _, _, [maybe_id]} -> maybe_id == spec_to_id(spec)
end)
case maybe_child do
{_, pid, _, _} ->
pid
_ ->
nil
end
end
defp spec_to_id(%{start: {id, _, _}}), do: id
defp spec_to_id(%{id: id}), do: id
defp spec_to_id(id) when is_atom(id), do: id
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment