Created
September 4, 2019 20:56
-
-
Save joachimdb/72ca1b7c4630d5a20d25cc3d82c4c563 to your computer and use it in GitHub Desktop.
testing async_nolink task messages and shutdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Cryptod.Executor.TestTask do | |
use GenServer | |
require Logger | |
def start_link(state \\ %{}) do | |
GenServer.start_link(__MODULE__, state, name: :test_task) | |
end | |
def run_task(id, timeout) do | |
GenServer.call(:test_task, {:fire, id, timeout}) | |
end | |
def kill_task(id, timeout) do | |
GenServer.call(:test_task, {:kill, id, timeout}) | |
end | |
def init(_args) do | |
{:ok, %{}} | |
end | |
def handle_call({:fire, id, timeout}, _from, state) do | |
task = | |
Task.Supervisor.async_nolink( | |
Cryptod.Executor.APITaskSupervisor, | |
fn -> | |
Logger.debug("run_async_task start") | |
:timer.sleep(timeout) | |
Logger.debug("run_async_task end") | |
:ok | |
end | |
) | |
{:reply, :ok, Map.put(state, id, task)} | |
end | |
def handle_call({:kill, id, timeout}, _from, state) do | |
task = Map.get(state, id) | |
if task do | |
with {:ok, result} <- Task.shutdown(task, timeout) do | |
Logger.debug("task #{id} returned #{result} after shutdown") | |
{:reply, {:ok, result}, Map.delete(state, id)} | |
else | |
{:exit, reason} -> | |
Logger.debug("task #{id} exited with reason #{reason} after shutdown") | |
{:reply, {:exit, reason}, Map.delete(state, id)} | |
nil -> | |
Logger.debug("task #{id} still running after shutdown") | |
{:reply, nil, state} | |
end | |
else | |
{:reply, {:error, "NOT_FOUND"}, state} | |
end | |
end | |
def handle_info({ref, task_result}, state) do | |
# the effect of the following line is that we do not get a :DOWN message unless the task crashes | |
Process.demonitor(ref, [:flush]) | |
{task_id, task} = Enum.find(state, fn {id, task} -> ref == task.ref end) | |
Logger.debug("Got task results #{inspect(task_result)} from task #{task_id}") | |
{:noreply, Map.delete(state, task_id)} | |
end | |
def handle_info({:DOWN, ref, _process, _pid, reason}, state) do | |
task_id = Enum.find(state, fn {id, task} -> ref == task.ref end) | |
Logger.debug("Got :DOWN message for task #{task_id}") | |
{:noreply, Map.delete(state, task_id)} | |
end | |
end | |
# now do sth like: | |
# iex(27)> Cryptod.Executor.TestTask.run_task(1, 10000) | |
# [debug] run_async_task start | |
# :ok | |
# iex(28)> Cryptod.Executor.TestTask.kill_task(1, 100) | |
# [debug] task 1 still running after shutdown | |
# nil | |
# iex(29)> Cryptod.Executor.TestTask.kill_task(1, 100) | |
# [debug] task 1 exited with reason noproc after shutdown | |
# {:exit, :noproc} | |
# iex(30)> Cryptod.Executor.TestTask.kill_task(1, 100) | |
# {:error, "NOT_FOUND"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment