Skip to content

Instantly share code, notes, and snippets.

@scvalex
Created July 20, 2011 20:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save scvalex/1095910 to your computer and use it in GitHub Desktop.
Save scvalex/1095910 to your computer and use it in GitHub Desktop.
Monitor a consumer and close its channel cleanly
-module(tc).
-compile([export_all]).
-include("include/amqp_client.hrl").
run2() ->
%% Setup a connection with two channels and declare a queue
{ok, Connection} = amqp_connection:start(#'amqp_params_network'{}),
{ok, OtherChannel} = amqp_connection:open_channel(Connection),
%% Start a consumer in a separate thread that will exit(kaput)
%% when it receives the message 'bye'
Pid = spawn(
fun () ->
{ok, Channel} = amqp_connection:open_channel(Connection),
#'queue.declare_ok'{queue = Q} =
amqp_channel:call(Channel, #'queue.declare'{}),
#'basic.consume_ok'{} =
amqp_channel:subscribe(Channel,
#'basic.consume'{queue = Q},
self()),
%% Setup a process to monitor the consumer
Self = self(),
spawn_link(
fun () ->
process_flag(trap_exit, true),
Self ! monitor_up,
receive
{'EXIT', Self, Reason} ->
io:format("consumer died with ~p!~n",
[Reason]),
amqp_channel:close(Channel)
end
end),
receive
monitor_up -> ok
end,
receive
bye -> exit(kaput)
end
end),
%% Monitor the consumer and the *other* channel
erlang:monitor(process, Pid),
erlang:monitor(process, OtherChannel),
%% Kill the consumer
Pid ! bye,
receive
{'DOWN', _, process, Pid, InfoC} ->
io:format("consumer went down with ~p~n", [InfoC])
end,
%% Wait for the *other* channel to die
receive
{'DOWN', _, process, OtherChannel, InfoP} ->
io:format("other channel died with ~p~n", [InfoP])
after 2000 ->
io:format("other channel still going strong :p~n")
end,
io:format("~nfin.~n"),
amqp_connection:close(Connection).
run() ->
%% Setup a connection with two channels and declare a queue
{ok, Connection} = amqp_connection:start(#'amqp_params_network'{}),
{ok, OtherChannel} = amqp_connection:open_channel(Connection),
%% Start a consumer in a separate thread that will exit(kaput)
%% when it receives the message 'bye'
Pid = spawn(
fun () ->
{ok, Channel} = amqp_connection:open_channel(Connection),
#'queue.declare_ok'{queue = Q} =
amqp_channel:call(Channel, #'queue.declare'{}),
#'basic.consume_ok'{} =
amqp_channel:subscribe(Channel,
#'basic.consume'{queue = Q},
self()),
link(Channel),
receive
bye -> exit(kaput)
end
end),
%% Monitor the consumer and the *other* channel
erlang:monitor(process, Pid),
erlang:monitor(process, OtherChannel),
%% Kill the consumer
Pid ! bye,
receive
{'DOWN', _, process, Pid, InfoC} ->
io:format("consumer went down with ~p~n", [InfoC])
end,
%% Wait for the *other* channel to die
receive
{'DOWN', _, process, OtherChannel, InfoP} ->
io:format("other channel died with ~p~n", [InfoP])
after 2000 ->
io:format("other channel still going strong :p~n")
end,
io:format("~nfin.~n"),
amqp_connection:close(Connection).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment