Skip to content

Instantly share code, notes, and snippets.

@videlalvaro
Last active December 24, 2015 15:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save videlalvaro/6819225 to your computer and use it in GitHub Desktop.
Save videlalvaro/6819225 to your computer and use it in GitHub Desktop.
include ../umbrella.mk
MAIN_NODE=undefined
OTHER_NODE=undefined
OTHER_PORT=undefined
OTHER_PLUGINS=undefined
BASEDIR=${TMPDIR}/rabbitmq-topic-tests/$(OTHER_NODE)
PID_FILE=$(BASEDIR)/$(OTHER_NODE).pid
start-other-node:
rm -f $(PID_FILE)
RABBITMQ_MNESIA_BASE=$(BASEDIR)/rabbitmq-$(OTHER_NODE)-mnesia \
RABBITMQ_LOG_BASE=$(BASEDIR) \
RABBITMQ_NODENAME=$(OTHER_NODE) \
RABBITMQ_NODE_PORT=$(OTHER_PORT) \
RABBITMQ_ENABLED_PLUGINS_FILE=${OTHER_PLUGINS} \
RABBITMQ_PLUGINS_DIR=${TMPDIR}/rabbitmq-test/plugins \
RABBITMQ_PLUGINS_EXPAND_DIR=$(BASEDIR)/rabbitmq-$(OTHER_NODE)-plugins-expand \
RABBITMQ_PID_FILE=$(PID_FILE) \
../rabbitmq-server/scripts/rabbitmq-server &
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) wait $(PID_FILE)
stop-other-node:
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) stop 2> /dev/null || true
cluster-other-node:
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) stop_app 2> /dev/null || true
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) join_cluster $(MAIN_NODE) 2> /dev/null || true
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) start_app 2> /dev/null || true
reset-other-node:
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) stop_app 2> /dev/null || true
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) reset 2> /dev/null || true
../rabbitmq-server/scripts/rabbitmqctl -n $(OTHER_NODE) start_app 2> /dev/null || true
DEPS:=rabbitmq-erlang-client
-module(rabbit_topic).
-behaviour(gen_server).
-include_lib("amqp_client/include/amqp_client.hrl").
%% API
-export([boot/0, start/0, start_link/0, stop/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {connection, channel, events_queue, exchange,
routing_key, up_re, down_re}).
-rabbit_boot_step({rabbit_topic,
[{description, "topic client"},
{mfa, {?MODULE, boot, []}},
{requires, direct_client}]}).
boot() ->
?MODULE:start(),
ok.
start() ->
gen_server:start({local, ?SERVER}, ?MODULE, [], []).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop(Pid) ->
gen_server:call(Pid, stop, infinity).
init([]) ->
mnesia:subscribe(system),
{ok, UpRe} = re:compile("^rabbit on node (.*@.*) up\n\$"),
{ok, DownRe} = re:compile("^rabbit on node (.*@.*) down\n\$"),
{ok, Connection} = amqp_connection:start(#amqp_params_direct{}),
{ok, Channel} = amqp_connection:open_channel(
Connection, {amqp_direct_consumer, [self()]}),
InitialState = #state{connection = Connection,
channel = Channel,
exchange = <<"amq.rabbitmq.log">>,
routing_key = <<"info">>,
up_re = UpRe,
down_re = DownRe},
State = setup_events_queue(InitialState),
setup_consumer(State),
{ok, State}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({#'basic.deliver'{}, #amqp_msg{payload = Payload}},
#state{channel = _Channel, up_re = UpRe, down_re = DownRe} = State) ->
io:format("got message: ~p~n", [Payload]),
ReOpts = [{capture, all_but_first, binary}],
case re:run(Payload, UpRe, ReOpts) of
{match, [UpNode]} ->
%% declare queue
UpNodeAtom = list_to_existing_atom(binary_to_list(UpNode)),
io:format("node up: ~p~n", [UpNodeAtom]);
_ -> ok
end,
{noreply, State};
handle_info(_Info, State) ->
io:format("got info: ~p~n", [_Info]),
{noreply, State}.
terminate(_Reason, #state{connection = Connection, channel = Channel}) ->
amqp_channel:close(Channel),
amqp_connection:close(Connection),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
setup_events_queue(State = #state{channel = Channel, exchange = X, routing_key = RK}) ->
#'queue.declare_ok'{queue = Q} =
amqp_channel:call(Channel, #'queue.declare'{}),
#'queue.bind_ok'{} = amqp_channel:call(Channel, #'queue.bind'{queue = Q, exchange = X, routing_key = RK}),
State#state{events_queue = Q}.
setup_consumer(#state{channel = Channel, events_queue = Q}) ->
amqp_channel:call(Channel, #'basic.consume'{queue = Q, no_ack = true}).
{application, rabbitmq_topic,
[{description, "RabbitMQ Topic Management"},
{vsn, "0.1.0"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, rabbit, amqp_client]}]}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment