Created
July 20, 2011 20:53
-
-
Save scvalex/1095910 to your computer and use it in GitHub Desktop.
Monitor a consumer and close its channel cleanly
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(tc). | |
-compile([export_all]). | |
-include("include/amqp_client.hrl"). | |
run2() -> | |
%% Setup a connection with two channels and declare a queue | |
{ok, Connection} = amqp_connection:start(#'amqp_params_network'{}), | |
{ok, OtherChannel} = amqp_connection:open_channel(Connection), | |
%% Start a consumer in a separate thread that will exit(kaput) | |
%% when it receives the message 'bye' | |
Pid = spawn( | |
fun () -> | |
{ok, Channel} = amqp_connection:open_channel(Connection), | |
#'queue.declare_ok'{queue = Q} = | |
amqp_channel:call(Channel, #'queue.declare'{}), | |
#'basic.consume_ok'{} = | |
amqp_channel:subscribe(Channel, | |
#'basic.consume'{queue = Q}, | |
self()), | |
%% Setup a process to monitor the consumer | |
Self = self(), | |
spawn_link( | |
fun () -> | |
process_flag(trap_exit, true), | |
Self ! monitor_up, | |
receive | |
{'EXIT', Self, Reason} -> | |
io:format("consumer died with ~p!~n", | |
[Reason]), | |
amqp_channel:close(Channel) | |
end | |
end), | |
receive | |
monitor_up -> ok | |
end, | |
receive | |
bye -> exit(kaput) | |
end | |
end), | |
%% Monitor the consumer and the *other* channel | |
erlang:monitor(process, Pid), | |
erlang:monitor(process, OtherChannel), | |
%% Kill the consumer | |
Pid ! bye, | |
receive | |
{'DOWN', _, process, Pid, InfoC} -> | |
io:format("consumer went down with ~p~n", [InfoC]) | |
end, | |
%% Wait for the *other* channel to die | |
receive | |
{'DOWN', _, process, OtherChannel, InfoP} -> | |
io:format("other channel died with ~p~n", [InfoP]) | |
after 2000 -> | |
io:format("other channel still going strong :p~n") | |
end, | |
io:format("~nfin.~n"), | |
amqp_connection:close(Connection). | |
run() -> | |
%% Setup a connection with two channels and declare a queue | |
{ok, Connection} = amqp_connection:start(#'amqp_params_network'{}), | |
{ok, OtherChannel} = amqp_connection:open_channel(Connection), | |
%% Start a consumer in a separate thread that will exit(kaput) | |
%% when it receives the message 'bye' | |
Pid = spawn( | |
fun () -> | |
{ok, Channel} = amqp_connection:open_channel(Connection), | |
#'queue.declare_ok'{queue = Q} = | |
amqp_channel:call(Channel, #'queue.declare'{}), | |
#'basic.consume_ok'{} = | |
amqp_channel:subscribe(Channel, | |
#'basic.consume'{queue = Q}, | |
self()), | |
link(Channel), | |
receive | |
bye -> exit(kaput) | |
end | |
end), | |
%% Monitor the consumer and the *other* channel | |
erlang:monitor(process, Pid), | |
erlang:monitor(process, OtherChannel), | |
%% Kill the consumer | |
Pid ! bye, | |
receive | |
{'DOWN', _, process, Pid, InfoC} -> | |
io:format("consumer went down with ~p~n", [InfoC]) | |
end, | |
%% Wait for the *other* channel to die | |
receive | |
{'DOWN', _, process, OtherChannel, InfoP} -> | |
io:format("other channel died with ~p~n", [InfoP]) | |
after 2000 -> | |
io:format("other channel still going strong :p~n") | |
end, | |
io:format("~nfin.~n"), | |
amqp_connection:close(Connection). |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment