Skip to content

Instantly share code, notes, and snippets.

@bmizerany
Created December 9, 2008 22:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bmizerany/34116 to your computer and use it in GitHub Desktop.
Save bmizerany/34116 to your computer and use it in GitHub Desktop.
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..31da4cb
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+\#.*
+.#*
+*.beam
+include/rabbit_framing.hrl
+src/rabbit_framing.erl
+codegen/amqp_codegen.pyc
+codegen/json.pyc
diff --git a/ebin/rabbit.app b/ebin/rabbit.app
index d446a04..4166946 100644
--- a/ebin/rabbit.app
+++ b/ebin/rabbit.app
@@ -32,6 +32,7 @@
rabbit_ticket,
rabbit_tracer,
rabbit_writer,
+ rabbit_snoop,
tcp_acceptor,
tcp_acceptor_sup,
tcp_client_sup,
@@ -43,6 +44,7 @@
rabbit_persister,
rabbit_router,
rabbit_sup,
+ rabbit_snoop,
rabbit_tcp_client_sup]},
{applications, [kernel, stdlib, sasl, mnesia, os_mon]},
{mod, {rabbit, []}},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 2190029..0afeaca 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -61,6 +61,8 @@
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
+-record(snooper, {pid, dummy}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e65d532..5c964c2 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -175,6 +175,10 @@ start(normal, []) ->
ok = rabbit_networking:start_tcp_listener(Host, Port)
end,
TCPListeners)
+ end},
+ {"Snooper",
+ fun() ->
+ ok = start_child(rabbit_snoop)
end}]
++ ExtraSteps),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7716ef1..b88d073 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -183,6 +183,7 @@ attempt_delivery(Txn, Message, State) ->
deliver_or_enqueue(Txn, Message, State) ->
case attempt_delivery(Txn, Message, State) of
{true, NewState} ->
+ rabbit_snoop:log_message(delivered, Message),
{true, NewState};
{false, NewState} ->
persist_message(Txn, qname(State), Message),
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index ad796b6..0a1e608 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -73,6 +73,7 @@ Available commands:
force_reset
cluster <ClusterNode> ...
status
+ snoop - snoops all message deliveries (very Alpha)
add_user <UserName> <Password>
delete_user <UserName>
@@ -140,6 +141,11 @@ action(status, Node, []) ->
io:format("~n~p~n", [Res]),
ok;
+action(snoop, Node, []) ->
+ io:format("Snooping ... ~n", []),
+ rpc_call(Node, rabbit_snoop, start, [self()]),
+ rabbit_snoop:listen();
+
action(add_user, Node, Args = [Username, _Password]) ->
io:format("Creating user ~p ...", [Username]),
call(Node, {rabbit_access_control, add_user, Args});
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index b8b437b..c382134 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -137,7 +137,9 @@ table_definitions() ->
{record_name, amqqueue},
{attributes, record_info(fields, amqqueue)}]},
{amqqueue, [{attributes, record_info(fields, amqqueue)},
- {index, [pid]}]}].
+ {index, [pid]}]},
+ {snooper, [{attributes, record_info(fields, snooper)}]}
+ ].
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
diff --git a/src/rabbit_snoop.erl b/src/rabbit_snoop.erl
new file mode 100644
index 0000000..f496df7
--- /dev/null
+++ b/src/rabbit_snoop.erl
@@ -0,0 +1,162 @@
+%% Server API
+
+%%%-------------------------------------------------------------------
+%%% File : rabbit_snoop.erl
+%%% Author : blake <blake@blake-mizeranys-macbook-air.local>
+%%% Description :
+%%%
+%%% Created : 8 Dec 2008 by blake <blake@blake-mizeranys-macbook-air.local>
+%%%-------------------------------------------------------------------
+-module(rabbit_snoop).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([log_message/2, start/1, listen/0]).
+
+-include("rabbit.hrl").
+-include_lib("stdlib/include/qlc.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+log_message(Action, Message) ->
+ gen_server:cast(?SERVER, {log_message, Action, Message}).
+
+log(Message, Args) ->
+ gen_server:cast(?SERVER, {log, Message, Args}).
+
+start(Pid) ->
+ gen_server:cast(?SERVER, {start, Pid}).
+
+%% Client API
+
+listen() ->
+ receive
+ {snoop, Message} ->
+ io:format(Message, []),
+ listen()
+ end.
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+ process_flag(trap_exit, true),
+ {ok, na}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({start, Pid}, State) ->
+ rabbit_log:info("Starting snooper ~p~n", [Pid]),
+ link(Pid),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:write(#snooper{pid = Pid})
+ end
+ ),
+ {noreply, State};
+handle_cast({log, Message, Args}, State) ->
+ Message2 = io_lib:format(Message, Args),
+ Snoopers = mnesia:dirty_all_keys(snooper),
+ lists:foreach(fun (Snooper) ->
+ Snooper ! {snoop, Message2}
+ end,
+ Snoopers),
+ {noreply, State};
+handle_cast({log_message, Action, {basic_message, ExchangeName, RoutingKey, Content, PersistentKey}}, State) ->
+ Msg = "~n
+====== basic_message : ~p ======
+ExchangeName: ~p
+RoutingKey: ~p
+Content: ~p
+PersistentKey: ~p",
+
+ log(Msg, [Action, ExchangeName, RoutingKey, Content, PersistentKey]),
+ {noreply, State};
+handle_cast({log_message, Action, Message}, State) ->
+ log("~n====== unknown message : ~p ======~n~p~n", [Action, Message]),
+ {noreply, State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({'EXIT', Pid, _Reason}, State) ->
+ on_pid_down(Pid),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+on_pid_down(Pid) ->
+ rabbit_log:info("Dropping Pid: ~p~n", [Pid]),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ mnesia:delete({snooper, Pid})
+ end).
+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment