Skip to content

Instantly share code, notes, and snippets.

@lukebakken
Forked from MarcialRosales/Readme.md
Created July 19, 2018 15:52
Show Gist options
  • Save lukebakken/e728e7d798533c0c643261ddb8807b40 to your computer and use it in GitHub Desktop.
Save lukebakken/e728e7d798533c0c643261ddb8807b40 to your computer and use it in GitHub Desktop.
Gather consumer channel information from RabbitMQ in order to troubleshoot an issue with unexpected unacknowledged messages

Instructions

  • Copy the files to an instance in your deployment:
    bosh -d <yourDeploymentName> scp gather-info.sh rmq/b9b26ca8-6a15-4bf6-9881-68486042f46b:/root/gather-info.sh
    bosh -d <yourDeploymentName> scp gather_channel_info.erl rmq/b9b26ca8-6a15-4bf6-9881-68486042f46b:/root/gather_channel_info.erl
    
  • ssh into the same rabbitmq VM and become root:
    bosh -d <yourDeploymentName> ssh rmq/b9b26ca8-6a15-4bf6-9881-68486042f46b
    $ sudo -i
    #
    
  • Run the script. The arguments, in order, are vhost name, and queue name:
    # cd /root
    # chmod 755 ./gather-info.sh
    # ./gather-info.sh / durable825
    
  • The process creates a file named /tmp/gather_channel_info.txt. Please send that file to Pivotal Support.
#!/usr/bin/env bash
set -o errexit
readonly vhost="$1"
if [[ -z $vhost ]]
then
echo '[ERROR] first argument must be a vhost name' 2>&1
exit 1
fi
readonly queue_name="$2"
if [[ -z $queue_name ]]
then
echo '[ERROR] second argument must be a queue name' 2>&1
exit 1
fi
set -o nounset
readonly uname="$(uname)"
if [[ $uname == 'Darwin' ]]
then
readonly sdir="$(/usr/bin/python -c 'import os, sys; print(os.path.realpath(sys.argv[1]))' "$(dirname "$0")")"
elif [[ $uname == 'Linux' ]]
then
readonly sdir="$(readlink -f "$(dirname "$0")")"
else
echo "[ERROR] unsupported system $uname" 2>&1
exit 1
fi
readonly erlsrc="$sdir/gather_channel_info.erl"
if [[ ! -f $erlsrc ]]
then
echo "[ERROR] could not find gather_channel_info.erl in $sdir" 2>&1
exit 1
fi
for maybe_bin_path in '/var/vcap/packages/rabbitmq-server/bin' '/var/vcap/packages/erlang/bin'
do
if [[ -d $maybe_bin_path ]]
then
export PATH="$PATH:$maybe_bin_path"
fi
done
if [[ -f /var/vcap/store/rabbitmq/erl_inetrc ]]
then
export ERL_INETRC='/var/vcap/store/rabbitmq/erl_inetrc'
fi
if ! hash erl 2>/dev/null
then
echo '[ERROR] erl command must be in PATH' 2>&1
exit 1
fi
if ! hash erlc 2>/dev/null
then
echo '[ERROR] erlc command must be in PATH' 2>&1
exit 1
fi
if ! hash rabbitmqctl 2>/dev/null
then
echo '[ERROR] rabbitmqctl command must be in PATH' 2>&1
exit 1
fi
readonly node="$(rabbitmqctl eval 'node().')"
if [[ -z $node ]]
then
echo '[ERROR] could not get local node name via rabbitmqctl' 2>&1
exit 1
fi
cookie_found='false'
for maybe_cookie_file in '/var/vcap/store/rabbitmq/.erlang.cookie' '/home/vcap/.erlang.cookie' "$HOME/.erlang.cookie"
do
if [[ -f $maybe_cookie_file ]]
then
readonly erlang_cookie_file="$maybe_cookie_file"
cookie_found='true'
break
fi
done
if [[ $cookie_found != 'true' ]]
then
echo "[ERROR] could not find the .erlang.cookie file" 2>&1
exit 1
fi
readonly beamname='gather_channel_info.beam'
readonly beamfile="$sdir/$beamname"
rm -f "$beamfile"
erlc +debug_info "$erlsrc"
rmq_ebin_copied='false'
for maybe_rmq_ebin in '/var/vcap/jobs/rabbitmq-server/packages/rabbitmq-server/ebin' '/var/vcap/packages/rabbitmq-server/ebin'
do
if [[ -d $maybe_rmq_ebin ]]
then
cp -f "$beamfile" "$maybe_rmq_ebin/$beamname"
rmq_ebin_copied='true'
fi
done
set +o nounset
if [[ -d $RABBITMQ_EBIN ]]
then
cp -f "$beamfile" "$RABBITMQ_EBIN/$beamname"
rmq_ebin_copied='true'
fi
set -o nounset
if [[ $rmq_ebin_copied != 'true' ]]
then
echo "[ERROR] could not find an ebin dir to which to copy $beamfile" 2>&1
exit 1
fi
erl -noshell -noinput -pa "$sdir" -sname "gather-channel-info-$$" -setcookie "$(< "$erlang_cookie_file")" -s gather_channel_info start "$node" "$vhost" "$queue_name"
for maybe_rmq_ebin in '/var/vcap/jobs/rabbitmq-server/packages/rabbitmq-server/ebin' '/var/vcap/packages/rabbitmq-server/ebin'
do
if [[ -f $maybe_rmq_ebin/$beamname ]]
then
rm -f "$maybe_rmq_ebin/$beamname"
fi
done
set +o nounset
if [[ -d $RABBITMQ_EBIN ]]
then
rm -f "$RABBITMQ_EBIN/$beamname"
fi
set -o nounset
-module(gather_channel_info).
-export([start/1, run/2]).
-define(VERSION, "3.0.0").
start([Node, VirtualHostAtm, QueueNameAtm]) ->
VirtualHost = list_to_binary(atom_to_list(VirtualHostAtm)),
QueueName = list_to_binary(atom_to_list(QueueNameAtm)),
prepare_module(Node),
ok = rpc:call(Node, gather_channel_info, run, [VirtualHost, QueueName]),
init:stop().
prepare_module(Node) ->
File = "/tmp/gather_channel_info.txt",
{ok, F} = file:open(File, [write]),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "~s ~s ~n", ["[INFO] VERSION:", ?VERSION]),
StatusResult = rpc:call(Node, code, module_status, [?MODULE]),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] status of the ~p module: ~p~n", [?MODULE, StatusResult]),
PurgeResult = rpc:call(Node, code, purge, [?MODULE]),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] result of purging the ~p module: ~p~n", [?MODULE, PurgeResult]),
LoadResult = rpc:call(Node, code, load_file, [?MODULE]),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] result of loading the ~p module: ~p~n", [?MODULE, LoadResult]),
file:close(F),
Mode = 8#00400 + 8#00200 + 8#00040 + 8#00020 + 8#00004 + 8#00002,
ok = file:change_mode(File, Mode).
run(VirtualHost, QueueName) ->
{ok, F} = file:open("/tmp/gather_channel_info.txt", [write, append]),
%% Get queue information
handle_pid_of(rabbit_amqqueue:pid_of(VirtualHost, QueueName), F, VirtualHost, QueueName).
handle_pid_of({error, not_found}, F, VirtualHost, QueueName) ->
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[ERROR] Queue ~p on ~p NOT FOUND~n", [QueueName, VirtualHost]),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
file:close(F);
handle_pid_of(QPid, F, VirtualHost, QueueName) ->
QueueProcInfo = get_process_info(QPid, [dictionary]),
QueueDict = proplists:get_value(dictionary, QueueProcInfo),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] Queue ~p on ~p process dictionary:~n~n~p~n", [QueueName, VirtualHost, QueueDict]),
Pred = fun({{ch, _}, _}) ->
true;
(_) -> false
end,
QueueInfo = lists:filter(Pred, QueueDict),
%% Print acktags and unsent_message_count from each channel bound to the queue
ChannelDetails = [{Pid, AckTags, Unsent} || {{ch, Pid}, {cr, _, _, AckTags, _, _, _, Unsent}} <- QueueInfo],
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] Channel consumer details (Pid, AckTags, Unsent):~n~n~p~n", [ChannelDetails]),
%% Get channel information bound to a queue called test-1
ChFun0 = fun({ChPid, _, _}, Acc0) ->
ChInfo = rabbit_channel:info(ChPid),
ConnPid0 = proplists:get_value(connection, ChInfo),
ChProcInfo = get_process_info(ChPid),
[{ChPid, {ch_info, ChInfo}, {ch_proc_info, ChProcInfo}, {conn_pid, ConnPid0}} | Acc0]
end,
ChInfos = lists:foldl(ChFun0, [], ChannelDetails),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] Channel infos:~n~n~p~n", [ChInfos]),
%% Get connection information associated to those channels.
ConnPidSet = sets:from_list([ConnPid1 || {_, _, _, {conn_pid, ConnPid1}} <- ChInfos]),
ConnFun = fun(ConnPid2, Acc0) ->
[get_process_info(ConnPid2) | Acc0]
end,
ConnProcInfos = sets:fold(ConnFun, [], ConnPidSet),
io:fwrite(F, "~s~n", ["------------------------------------------------------------------------"]),
io:fwrite(F, "[INFO] Connection process info:~n~n~p~n", [ConnProcInfos]),
file:close(F).
get_process_info(Pid) ->
get_process_info(Pid, []).
get_process_info(Pid, Args) when node() =:= node(Pid) ->
erlang:process_info(Pid, Args);
get_process_info(Pid, []) ->
rpc:call(node(Pid), erlang, process_info, [Pid]);
get_process_info(Pid, Args) ->
rpc:call(node(Pid), erlang, process_info, [Pid, Args]).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment