Skip to content

Instantly share code, notes, and snippets.

@alvesl
Created March 1, 2018 23:07
Show Gist options
  • Save alvesl/05959b76e9ac3372694563bf6e1918a3 to your computer and use it in GitHub Desktop.
Save alvesl/05959b76e9ac3372694563bf6e1918a3 to your computer and use it in GitHub Desktop.
defmodule MyApp.Manager do
use GenServer
use Timex
import Ecto.Query, warn: false
@cache_pending_groups_expiry 60 * 5
def start_link() do
Singleton.start_child(__MODULE__, %{supervised_workers: 0, groups: nil}, {MyApp.Manager, 1})
end
def init(state) do
groups = load_groups()
new_state = Map.put(state, :groups, groups)
schedule_workers(groups)
schedule_reporter() # Schedule work to be performed on start
{:ok, new_state}
end
# CLIENT API
def dismiss_worker(group_id) do
pid = :global.whereis_name({MyApp.Manager, 1})
GenServer.cast(pid, {:remove_worker, group_id})
end
def current_workers() do
pid = :global.whereis_name({MyApp.Manager, 1})
GenServer.call(pid, {:current_workers})
end
def force_report() do
pid = :global.whereis_name({MyApp.Manager, 1})
groups = load_groups()
GenServer.call(pid, {:force_report, groups})
end
def new_worker(group) do
pid = :global.whereis_name({MyApp.Manager, 1})
GenServer.call(pid, {:start_worker, group})
end
# SERVER API
def handle_call({:start_worker, group}, _from, state) do
start_worker(group)
new_state = Map.put(state, :groups, state.groups ++ [group])
{:reply, {:ok, group}, new_state}
end
def handle_call({:current_workers}, _from, state) do
{:reply, length(state.groups), state}
end
def handle_call({:force_report, groups}, _from, state) do
new_state = report(groups, state)
{:reply, :ok, new_state}
end
def handle_cast({:remove_worker, group_id}, state) do
new_state = Map.put(state, :groups, Enum.reject(state.groups, &(&1.id == group_id)))
{:noreply, new_state}
end
def handle_info(:report, state) do
new_state = report(load_groups(), state)
schedule_reporter() # Reschedule once more
{:noreply, new_state}
end
defp schedule_reporter() do
Process.send_after(self(), :report, 10 * 1000 * 60) # Every 10 min
end
defp schedule_workers(groups) do
Enum.each(groups, fn(group) ->
start_worker(group)
end)
end
# AUX FUNCTIONS (Some are omitted)
defp lookup(group_id) do
case Registry.lookup(Registry.MyApp, "group-#{group_id}") do
[{pid, _}] -> pid
_ -> nil
end
end
def start_worker(group) do
pid_name = {:via, Registry, {Registry.MyApp, "group-#{group.id}"}}
spawn(fn() -> MyApp.Worker.start_link(pid_name, group) end)
end
defp report(groups, state) do
# Does a bunch of stuff and then calls terminate_unhealthy
end
defp terminate_unhealthy(groups) do
# Terminate no longer valid groups
Enum.each(groups, fn(group) ->
pid = lookup(group.id)
if terminate?(pid) do
Process.exit(pid, :kill)
end
end)
end
defp schedule_groups(groups) when not is_nil(groups) do
Enum.each(groups, fn(group) ->
# Check if process is alive
pid = lookup(group.id)
if schedule?(pid) do
start_worker(group)
end
end)
end
defp schedule?(pid) when is_nil(pid), do: true
defp schedule?(pid), do: !Process.alive?(pid)
defp terminate?(pid), do: !schedule?(pid)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment