Skip to content

Instantly share code, notes, and snippets.

@everilae
Last active August 29, 2015 14:19
Show Gist options
  • Save everilae/e8c52cab6df838e7fe2a to your computer and use it in GitHub Desktop.
Save everilae/e8c52cab6df838e7fe2a to your computer and use it in GitHub Desktop.
Learning Erlang, simple task queue
-module(mq).
-export([
start/0, disp_status/1, get_status/1, add_task/2, add_simple_worker/2,
add_worker/2, send_ack/1, echo_worker/0
]).
-spec start() -> pid().
%% @doc Spawn new queue and return process id.
start() ->
spawn(fun loop/0).
-spec disp_status(M :: pid()) -> ok.
%% @doc Print status of `M' to standard output.
disp_status(M) ->
M ! status,
ok.
-type task() :: any().
-type worker() :: fun(() -> none()).
-spec get_status(M :: pid()) -> {ok, {[task()], [worker()]}} | {error, timeout}.
%% @doc Get status of `M'.
get_status(M) ->
Ref = make_ref(),
M ! {status, self(), Ref},
receive
{M, Ref, Status} ->
{ok, Status}
after 1000 ->
{error, timeout}
end.
-spec add_task(M :: pid(), T :: task()) -> ok.
%% @doc Add task `T' to worker queue of `M'.
add_task(M, T) ->
M ! {task, T},
ok.
-spec add_worker(M :: pid(), F :: worker()) -> ok.
%% @doc Spawn new worker `F' and add it to worker queue of `M'.
add_worker(M, F) ->
M ! {worker, spawn(F)},
ok.
-spec add_simple_worker(M :: pid(), F :: fun((task()) -> none())) -> ok.
%% @doc Spawn new worker wrapping `F' and add it to worker queue of `M'.
add_simple_worker(M, F) ->
M ! {worker, spawn(fun W() ->
receive
{M, T} ->
F(T),
send_ack(M)
end,
W()
end)},
ok.
-spec send_ack(M :: pid()) -> ok.
%% @doc Send an acknowledgement to `M'. Workers must always call this
%% when task is done, or they will not be put back to `Worker' queue.
send_ack(M) ->
M ! {ack, self()},
ok.
-spec echo_worker() -> none().
%% @doc Sample echo worker.
echo_worker() ->
receive
{M, T} ->
io:format("~w: ~w~n", [self(), T]),
send_ack(M)
end,
echo_worker().
-spec dispatch(T :: task(), W :: worker()) -> ok.
%% @doc Dispatch task `T' to worker `W'.
dispatch(T, W) ->
W ! {self(), T},
ok.
-spec wait(Tasks :: queue:queue(task()), Workers :: [worker()]) -> none().
%% @doc Wait for tasks and workers to handle said tasks. Serve status
%% information.
wait(Tasks, Workers) ->
receive
{task, T} ->
loop(queue:in(T, Tasks), Workers);
{worker, W} ->
loop(Tasks, [W|Workers]);
{ack, W} ->
loop(Tasks, [W|Workers]);
status ->
io:format("Tasks: ~w, Workers: ~w~n", [queue:to_list(Tasks), Workers]);
{status, P, Ref} ->
P ! {self(), Ref, {queue:to_list(Tasks), Workers}}
end,
loop(Tasks, Workers).
-spec loop() -> none().
% Nothing done, just wait.
loop() ->
wait(queue:new(), []).
-spec loop(Tasks :: queue:queue(task()), Workers :: [worker()]) -> none().
% Maybe has tasks, but no workers.
loop(Tasks, Workers = []) ->
wait(Tasks, Workers);
% Maybe has tasks and has workers
loop(Tasks, Workers) ->
case queue:out(Tasks) of
{empty, _} ->
wait(Tasks, Workers);
{{value, T}, NewTasks} ->
dispatch(T, hd(Workers)),
loop(NewTasks, tl(Workers))
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment