Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fdmanana/543834 to your computer and use it in GitHub Desktop.
Save fdmanana/543834 to your computer and use it in GitHub Desktop.
-module(qteste).
-compile(export_all).
run() ->
process_flag(trap_exit, true),
{ok, Q} = couch_work_queue:new(
[{max_size, 8}, {max_items, 2}, {multi_workers, true}]),
C1 = consumer("Consumer_1", Q),
C2 = consumer("Consumer_2", Q),
C3 = consumer("Consumer_3", Q),
timer:sleep(2000),
P1 = producer("Producer_1", Q),
P2 = producer("Producer_2", Q),
receive
{'EXIT', P1, normal} ->
ok
end,
receive
{'EXIT', P2, normal} ->
io:format("Closing queue~n", []),
couch_work_queue:close(Q),
io:format("Queue closed~n", [])
end,
ok.
producer(Name, Q) ->
spawn_link(fun() ->
io:format("Producer ~p started~n", [Name]),
produce_loop(Name, Q, 1)
end).
produce_loop(Name, Q, N) ->
Item = Name ++ " " ++ integer_to_list(N),
case N < 5 of
true ->
io:format("Producer ~p queing item ~p~n", [Name, Item]),
ok = couch_work_queue:queue(Q, Item),
io:format("Producer ~p queued item ~p~n", [Name, Item]),
produce_loop(Name, Q, N + 1);
false ->
io:format("Producer ~p exiting~n", [Name])
end.
consumer(Name, Q) ->
spawn(fun() ->
io:format("Consumer ~p started~n", [Name]),
consume_loop(Name, Q)
end).
consume_loop(Name, Q) ->
io:format("Consumer ~p waiting for item~n", [Name]),
case couch_work_queue:dequeue(Q, 1) of
{ok, Items} ->
io:format("Consumer ~p got items: ~p~n", [Name, Items]),
timer:sleep(2000),
consume_loop(Name, Q);
closed ->
io:format("Consumer ~p got closed queue, exiting~n", [Name])
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment