Skip to content

Instantly share code, notes, and snippets.

@jkvor
Created January 29, 2010 21:48
Show Gist options
  • Save jkvor/290176 to your computer and use it in GitHub Desktop.
Save jkvor/290176 to your computer and use it in GitHub Desktop.
-module(rabbit_test).
-compile(export_all).
-include_lib("rabbitmq_erlang_client/include/amqp_client.hrl").
-define(X, <<1,2,3,4>>).
-define(RoutingKey, <<5,6,7,8>>).
-define(Q, <<9,10,11,12>>).
%% -------------------------------------
%% SETUP QUEUE
%% -------------------------------------
setup_queue() ->
net_adm:world(),
Node = rabbit_node(),
Connection = rpc:call(Node, amqp_connection, start_direct, []),
Channel = amqp_connection:open_channel(Connection),
amqp_channel:call(Channel, #'exchange.declare'{exchange = ?X, type = <<"topic">>}),
amqp_channel:call(Channel, #'queue.declare'{queue = ?Q}),
Route = #'queue.bind'{queue = ?Q, exchange = ?X, routing_key = ?RoutingKey},
amqp_channel:call(Channel, Route),
amqp_channel:close(Channel),
amqp_connection:close(Connection),
ok.
%% -------------------------------------
%% PUBLISHER
%% -------------------------------------
start_pub() ->
proc_lib:start_link(?MODULE, init_pub, [self()]).
init_pub(Parent) ->
Channel = channel(),
proc_lib:init_ack(Parent, {ok, self()}),
loop_pub(Channel).
loop_pub(Channel) ->
Publish = #'basic.publish'{exchange = ?X, routing_key = ?RoutingKey},
ok = amqp_channel:call(Channel, Publish, #amqp_msg{payload = <<"abcdefg">>}),
timer:sleep(10),
loop_pub(Channel).
%% -------------------------------------
%% SUBSCRIBER
%% -------------------------------------
start_sub() ->
proc_lib:start_link(?MODULE, init_sub, [self()]).
init_sub(Parent) ->
Channel = channel(),
proc_lib:init_ack(Parent, {ok, self()}),
loop_sub(Channel).
loop_sub(Channel) ->
amqp_channel:call(Channel, #'basic.get'{queue = ?Q, no_ack = true}),
timer:sleep(10),
loop_sub(Channel).
%% -------------------------------------
%% UTIL FUNCTIONS
%% -------------------------------------
rabbit_node() -> hd(nodes()).
channel() ->
Node = rabbit_node(),
Connection = rpc:call(Node, amqp_connection, start_direct, []),
amqp_connection:open_channel(Connection).
length() ->
Channel = channel(),
{'queue.declare_ok', _, MC, _} = amqp_channel:call(Channel, #'queue.declare'{queue = ?Q}),
MC.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment