Skip to content

Instantly share code, notes, and snippets.

@eskil
Created February 17, 2021 22:36
Show Gist options
  • Save eskil/6bc8d8313768d22c10d3a458aa0c986d to your computer and use it in GitHub Desktop.
Save eskil/6bc8d8313768d22c10d3a458aa0c986d to your computer and use it in GitHub Desktop.
Example of how to do retryable Task.Supervisor.async_stream since it doesn't do retries for you
require Integer
require Logger
defmodule Processor do
def process(state) do
:timer.sleep(1000)
result = :os.system_time(:millisecond)
Logger.info("process(#{state}) -> #{result}")
case Integer.is_even(result) do
true ->
# 10% chance of informative {:noretry, state}
# 20% chance of hard failure
# 80% chance of a {:retry, state}
case Enum.random(0..10) do
0 -> {:noretry, state}
1 -> Process.exit(self(), :error)
2 -> Process.exit(self(), :error)
_ -> {:retry, state}
end
_ ->
{:ok, {state, result}}
end
end
end
defmodule StreamHelper do
def async_stream(_supervisor, [], _function, result_acc, failed_acc, _) do
Logger.info("async_stream([]) -> done")
{result_acc, failed_acc}
end
def async_stream(_supervisor, enumerable, _function, result_acc, failed_acc, tries) when tries == 0 do
Logger.info("async_steam(#{inspect(Enum.to_list(enumerable), charlists: :as_lists)}, tries=#{tries}) no more retries")
{result_acc, failed_acc ++ enumerable}
end
def async_stream(supervisor, enumerable, function, result_acc, failed_acc, tries, opts \\ []) do
Logger.info("async_steam(#{inspect(Enum.to_list(enumerable), charlists: :as_lists)}, tries=#{tries})")
stream = Task.Supervisor.async_stream_nolink(supervisor, enumerable, function, opts)
events = Enum.to_list(stream)
Logger.info("async_stream -> #{inspect(events, charlists: :as_lists)}")
group_by = fn event ->
case event do
{:ok, {:ok, _}} -> :completed
{:ok, {:retry, _}} -> :retry
{:ok, {:noretry, _}} -> :noretry
{:exit, _} -> :failed
end
end
val_by = fn event ->
case event do
{:ok, {:ok, result}} -> result
{:ok, {:retry, state}} -> state
{:ok, {:noretry, state}} -> state
_ -> event
end
end
groups = Enum.group_by(events, group_by, val_by)
Logger.info("groups -> #{inspect(groups, charlists: :as_lists)}")
completed = Map.get(groups, :completed, [])
retryable = Map.get(groups, :retry, [])
notretryable = Map.get(groups, :noretry, [])
failed = Map.get(groups, :failed, [])
completed_states = Enum.map(result_acc ++ completed, fn {state, _} -> state end)
# Failed tasks ({:exit, ...}) don't return their state, so take the input states
failed_states = Enum.to_list(enumerable) -- (completed_states ++ retryable ++ notretryable)
Logger.info("completed states -> #{inspect(completed_states, charlists: :as_lists)}")
Logger.info("failed states -> #{inspect(failed_states, charlists: :as_lists)}")
Logger.info("completed -> #{inspect(completed, charlists: :as_lists)}")
Logger.info("retryable -> #{inspect(retryable, charlists: :as_lists)}")
Logger.info("not retryable -> #{inspect(notretryable, charlists: :as_lists)}")
Logger.info("failed -> #{inspect(failed, charlists: :as_lists)}")
async_stream(supervisor, retryable, function, result_acc ++ completed, failed_acc ++ notretryable ++ failed_states, tries - 1)
end
def async_stream(supervisor, enumerable, function, opts \\ []) do
{tries, opts} = Keyword.pop(opts, :retries, 3)
async_stream(supervisor, enumerable, function, [], [], tries, opts)
end
end
children = [{Task.Supervisor, name: MyTaskSupervisor}]
Supervisor.start_link(children, strategy: :one_for_one, restart: :permanent)
Logger.info(inspect(StreamHelper.async_stream(MyTaskSupervisor, 1..10, &Processor.process/1, [retries: 3, max_concurrency: 2]), charlists: :as_lists))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment