Skip to content

Instantly share code, notes, and snippets.

@vladdu
Last active November 24, 2016 14:05
Show Gist options
  • Save vladdu/911a3ccccc6fa8b0aed08a93ec8fa37e to your computer and use it in GitHub Desktop.
Save vladdu/911a3ccccc6fa8b0aed08a93ec8fa37e to your computer and use it in GitHub Desktop.
Cancellable worker process
-module(cancellable_worker).
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%% ====================================================================
%% API functions
%% ====================================================================
-export([
start/1,
start/3,
cancel/1,
cancel/2,
yield/1,
yield/2,
check/1
]).
%% Implement a worker process that can be cancelled and then may return a
%% partial answer.
%% The function doing the actual work takes as argument a Reporter function to
%% use to report results:
%% - Reporter(partial, Value) for a partial result
%% - Reporter(final, Value) for the whole result (if partial results are not
%% possible); do not report this after any partial values
%% If partial results are sent, they are aggregated in a list, which is returned
start(WorkerFun) ->
gen_server:start(?MODULE, WorkerFun, []).
start(Module, Function, Args) ->
start(fun() -> apply(Module, Function, Args) end).
%% Check/1 checks if there are any answers from the worker. It can return
%% - {partial, Values} : the list of all currently reported values
%% - {final, Value} : the final result
%% - {error, {Value1, Value2}} : unexpected 'final' Value2 reported (either
%% after another 'final' or after 'partial's Value1)
check(MonPid) when is_pid(MonPid) ->
gen_server:call(MonPid, check).
%% Cancels the worker and returns the current results.
cancel(MonPid) when is_pid(MonPid) ->
gen_server:call(MonPid, cancel).
cancel(MonPid, Timeout) when is_pid(MonPid) ->
gen_server:call(MonPid, cancel, Timeout).
%% Wait until the the worker has finished and return the final result.
%% TODO don't return partial/final
yield(MonPid) when is_pid(MonPid) ->
gen_server:call(MonPid, yield).
yield(MonPid, Timeout) when is_pid(MonPid) ->
gen_server:call(MonPid, yield, Timeout).
%% ====================================================================
%% Behavioural functions
%% ====================================================================
-record(state, {
worker_pid,
results = {partial, undefined},
yielding = false,
worker_running = false
}).
init(WorkerFun) ->
Monitor = self(),
Report = fun(partial, V) -> gen_server:cast(Monitor, {partial, V});
(final, V) -> gen_server:cast(Monitor, {final, V})
end,
{WrkPid, _Ref} = spawn_monitor(fun() ->
WorkerFun(Report)
end),
{ok, #state{worker_pid=WrkPid, worker_running = true}}.
handle_call(check, _From, State=#state{results=Results, worker_running=true}) ->
Reply = adjust(Results),
{reply, Reply, State};
handle_call(check, _From, State=#state{results=Results, worker_running=false}) ->
{_, Reply} = adjust(Results),
{reply, {final, Reply}, State};
handle_call(cancel, _From, State=#state{results=Results, worker_pid=Pid}) ->
exit(Pid, kill),
{_, Reply} = adjust(Results),
{stop, normal, {ok, Reply}, State};
handle_call(yield, _From, State=#state{worker_running=false, results=Results}) ->
{_, Reply} = adjust(Results),
{stop, normal, {ok, Reply}, State};
handle_call(yield, From, State) ->
{noreply, State#state{yielding=From}};
handle_call(Request, _From, State) ->
Reply = {error, {unknown, Request}},
{reply, Reply, State}.
handle_cast(V, State=#state{results=Results}) ->
NewResults = merge_result(V, Results),
{noreply, State#state{results=NewResults}};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _, process, Pid, _Reason},
State=#state{worker_pid=Pid,
yielding=From,
results=Results}) when From /= false ->
{_, Reply} = adjust(Results),
gen_server:reply(From, {ok, Reply}),
{noreply, State#state{worker_running=false}};
handle_info({'DOWN', _, process, Pid, _Reason}, State=#state{worker_pid=Pid}) ->
{noreply, State#state{worker_running=false}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ====================================================================
%% Internal functions
%% ====================================================================
adjust({K, Results}=Arg) ->
if is_list(Results) ->
{K, lists:reverse(Results)};
true ->
Arg
end.
merge_result({final, V}, {partial, undefined}) ->
{final, V};
merge_result({partial, V}, {partial, undefined}) ->
{partial, [V]};
merge_result({final, V}, {partial, R}) ->
{final, [V|R]};
merge_result({partial, V}, {partial, R}) ->
{partial, [V|R]};
merge_result(_V, R) ->
R.
-module(cancellable_worker_tests).
-include_lib("eunit/include/eunit.hrl").
run_test(Fun, Test) ->
{ok, Worker} = cancellable_worker:start(Fun),
Result = (catch Test(Worker)),
Result.
run_test_() ->
[
?_assertMatch({final, undefined},
run_test(fun(_M)-> ok end,
fun(W) -> cancellable_worker:check(W) end
)
),
?_assertMatch({final, [v2]},
run_test(fun(M) -> M(partial, v2), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, [v3, v4]},
run_test(fun(M) -> M(partial, v3), M(partial, v4), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({partial, undefined},
run_test(fun(M)-> w(10), M(partial, v5) end,
fun(W) -> cancellable_worker:check(W) end
)
),
?_assertMatch({final, v7},
run_test(fun(M) -> M(final, v7), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, v8},
run_test(fun(M) -> M(final, v8), M(final, v9), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, v10},
run_test(fun(M) -> M(final, v10), M(partial, v11), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({final, [v12, v13]},
run_test(fun(M) -> M(partial, v12), M(final, v13), ok end,
fun(W) -> w(10), cancellable_worker:check(W) end
)
),
?_assertMatch({{partial, [v12a]}, {partial, [v12a, v13a]}, {final, [v12a, v13a]}},
run_test(fun(M) -> M(partial, v12a), w(20), M(partial, v13a), w(20), ok end,
fun(W) -> w(10), A=cancellable_worker:check(W),
w(30), B=cancellable_worker:check(W),
w(50), C=cancellable_worker:check(W),
{A, B, C}
end
)
),
?_assertMatch({ok, [v14]},
run_test(fun(M) -> M(partial, v14), w(50), M(partial, v15), ok end,
fun(W) -> w(10), cancellable_worker:cancel(W) end
)
),
?_assertMatch({ok, [v16, v17]},
run_test(fun(M) -> M(partial, v16), w(10), M(partial, v17), ok end,
fun(W) -> w(30), cancellable_worker:cancel(W) end
)
),
?_assertMatch({final, [v18]},
run_test(fun(M) -> M(partial, v18), w(5), crash:crash(), w(5), M(partial, v19), ok end,
fun(W) -> w(20), cancellable_worker:check(W) end
)
),
?_assertMatch({ok, undefined},
run_test(fun(_M)-> ok end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v6]},
run_test(fun(M) -> M(partial, v6) end,
fun(W) -> w(10), cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v19, v20]},
run_test(fun(M) -> M(partial, v19), M(partial, v20) end,
fun(W) -> w(10), cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v21]},
run_test(fun(M) -> w(10), M(partial, v21) end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v22, v23]},
run_test(fun(M) -> M(partial, v22), M(final, v23) end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assertMatch({ok, [v24]},
run_test(fun(M) -> M(partial, v24), w(5), crash:crash(), M(partial, v25), ok end,
fun(W) -> cancellable_worker:yield(W) end
)
),
?_assert(true)
].
w(N) ->
receive after N -> ok end.
@vladdu
Copy link
Author

vladdu commented Nov 24, 2016

@okeuday Thanks, I see your point, but I think my worker here is actually your controlling process. At least, in this latest version where it does little more than gathering partial results and checking that the real worker has finished.

My use case is for a server that waits for commands from the UI and executes them by spawning one of these processes. The user or other commands might request cancellation of pending operations. The UI can retrieve partial results when the operation takes a long time, to keep the user happy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment