|
-module(gather_channel_info). |
|
|
|
-export([start/1, run/2]). |
|
|
|
-define(VERSION, "3.0.0"). |
|
|
|
start([Node, VirtualHostAtm, QueueNameAtm]) -> |
|
VirtualHost = list_to_binary(atom_to_list(VirtualHostAtm)), |
|
QueueName = list_to_binary(atom_to_list(QueueNameAtm)), |
|
prepare_module(Node), |
|
ok = rpc:call(Node, gather_channel_info, run, [VirtualHost, QueueName]), |
|
init:stop(). |
|
|
|
prepare_module(Node) -> |
|
File = "/tmp/gather_channel_info.txt", |
|
{ok, F} = file:open(File, [write]), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "~s ~s ~n", ["[INFO] VERSION:", ?VERSION]), |
|
StatusResult = rpc:call(Node, code, module_status, [?MODULE]), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] status of the ~p module: ~p~n", [?MODULE, StatusResult]), |
|
PurgeResult = rpc:call(Node, code, purge, [?MODULE]), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] result of purging the ~p module: ~p~n", [?MODULE, PurgeResult]), |
|
LoadResult = rpc:call(Node, code, load_file, [?MODULE]), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] result of loading the ~p module: ~p~n", [?MODULE, LoadResult]), |
|
file:close(F), |
|
Mode = 8#00400 + 8#00200 + 8#00040 + 8#00020 + 8#00004 + 8#00002, |
|
ok = file:change_mode(File, Mode). |
|
|
|
|
|
run(VirtualHost, QueueName) -> |
|
{ok, F} = file:open("/tmp/gather_channel_info.txt", [write, append]), |
|
%% Get queue information |
|
handle_pid_of(rabbit_amqqueue:pid_of(VirtualHost, QueueName), F, VirtualHost, QueueName). |
|
|
|
handle_pid_of({error, not_found}, F, VirtualHost, QueueName) -> |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[ERROR] Queue ~p on ~p NOT FOUND~n", [QueueName, VirtualHost]), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
file:close(F); |
|
handle_pid_of(QPid, F, VirtualHost, QueueName) -> |
|
QueueProcInfo = get_process_info(QPid, [dictionary]), |
|
QueueDict = proplists:get_value(dictionary, QueueProcInfo), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] Queue ~p on ~p process dictionary:~n~n~p~n", [QueueName, VirtualHost, QueueDict]), |
|
|
|
Pred = fun({{ch, _}, _}) -> |
|
true; |
|
(_) -> false |
|
end, |
|
QueueInfo = lists:filter(Pred, QueueDict), |
|
|
|
%% Print acktags and unsent_message_count from each channel bound to the queue |
|
ChannelDetails = [{Pid, AckTags, Unsent} || {{ch, Pid}, {cr, _, _, AckTags, _, _, _, Unsent}} <- QueueInfo], |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] Channel consumer details (Pid, AckTags, Unsent):~n~n~p~n", [ChannelDetails]), |
|
|
|
%% Get channel information bound to a queue called test-1 |
|
ChFun0 = fun({ChPid, _, _}, Acc0) -> |
|
ChInfo = rabbit_channel:info(ChPid), |
|
ConnPid0 = proplists:get_value(connection, ChInfo), |
|
ChProcInfo = get_process_info(ChPid), |
|
[{ChPid, {ch_info, ChInfo}, {ch_proc_info, ChProcInfo}, {conn_pid, ConnPid0}} | Acc0] |
|
end, |
|
ChInfos = lists:foldl(ChFun0, [], ChannelDetails), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] Channel infos:~n~n~p~n", [ChInfos]), |
|
|
|
%% Get connection information associated to those channels. |
|
ConnPidSet = sets:from_list([ConnPid1 || {_, _, _, {conn_pid, ConnPid1}} <- ChInfos]), |
|
ConnFun = fun(ConnPid2, Acc0) -> |
|
[get_process_info(ConnPid2) | Acc0] |
|
end, |
|
ConnProcInfos = sets:fold(ConnFun, [], ConnPidSet), |
|
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]), |
|
io:fwrite(F, "[INFO] Connection process info:~n~n~p~n", [ConnProcInfos]), |
|
file:close(F). |
|
|
|
get_process_info(Pid) -> |
|
get_process_info(Pid, []). |
|
|
|
get_process_info(Pid, Args) when node() =:= node(Pid) -> |
|
erlang:process_info(Pid, Args); |
|
get_process_info(Pid, []) -> |
|
rpc:call(node(Pid), erlang, process_info, [Pid]); |
|
get_process_info(Pid, Args) -> |
|
rpc:call(node(Pid), erlang, process_info, [Pid, Args]). |