Skip to content

Instantly share code, notes, and snippets.

@jbrisbin
Created January 31, 2011 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jbrisbin/804058 to your computer and use it in GitHub Desktop.
Save jbrisbin/804058 to your computer and use it in GitHub Desktop.
Errors I'm getting trying to persist RabbitMQ messages to an external store.
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2011 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2011 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2011 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-module(rabbit_riak_queue).
-export(
[start/1, stop/0, init/3, terminate/1, delete_and_terminate/1, purge/1,
publish/3, publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3,
tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1,
idle_timeout/1, handle_pre_hibernate/1, status/1]).
%%----------------------------------------------------------------------------
%% This is a simple implementation of the rabbit_backing_queue
%% behavior, with all msgs stored in Riak.
%%
%% It is based on the great work of John DeTreville, who provided a solid
%% foundation for this module.
%% ----------------------------------------------------------------------------
%%----------------------------------------------------------------------------
%% This module wraps messages into M records for internal use,
%% containing the messages themselves and additional
%% information. Pending acks are also recorded in memory as M records.
%%
%% All queues are non-durable in this version, and all messages are
%% transient (non-persistent). (This breaks some Java tests for
%% durable queues.)
%%----------------------------------------------------------------------------
-behaviour(rabbit_backing_queue).
-record(s, % The in-RAM queue state
{ q, % A temporary in-RAM queue of Ms
next_seq_id, % The next seq_id to use to build an M
pending_ack_dict, % Map from seq_id to M, pending ack
txn_dict, % Map from txn to tx, in progress
name % The name of this queue
}).
-record(m, % A wrapper aroung a msg
{ seq_id, % The seq_id for the msg
msg, % The msg itself
props, % The message properties
is_delivered % Has the msg been delivered? (for reporting)
}).
-record(tx,
{ to_pub,
to_ack }).
-include("rabbit_riak_queue.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
%%----------------------------------------------------------------------------
%% BUG: Restore -ifdef, -endif.
%% -ifdef(use_specs).
-type(seq_id() :: non_neg_integer()).
-type(ack() :: seq_id() | 'blank_ack').
-type(s() :: #s { next_seq_id :: seq_id(),
pending_ack_dict :: dict() }).
-type(state() :: s()).
-type(m() :: #m { msg :: rabbit_types:basic_message(),
seq_id :: seq_id(),
props :: rabbit_types:message_properties(),
is_delivered :: boolean() }).
-type(tx() :: #tx { to_pub :: [{rabbit_types:basic_message(),
rabbit_types:message_properties()}],
to_ack :: [seq_id()] }).
-include_lib("rabbit_common/include/rabbit_backing_queue_spec.hrl").
%% -endif.
%%----------------------------------------------------------------------------
%% Public API
%%
%% Specs are in rabbit_backing_queue_spec.hrl but are repeated here.
%%----------------------------------------------------------------------------
%% start/1 promises that a list of (durable) queue names will be
%% started in the near future. This lets us perform early checking
%% necessary for the consistency of those queues or initialise other
%% shared resources.
%%
%% This function should be called only from outside this module.
%%
%% -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
start(_DurableQueues) ->
rabbit_log:info("start(_) ->"),
Config = case application:get_env(riak_msg_store) of
{ok, C} -> C;
_ -> []
end,
ok = rabbit_sup:start_child(?MSG_STORE, riak_msg_store, [?MSG_STORE, Config]),
rabbit_log:info(" -> ok"),
ok.
%%----------------------------------------------------------------------------
%% stop/0 tears down all state/resources upon shutdown. It might not
%% be called.
%%
%% This function should be called only from outside this module.
%%
%% -spec(stop/0 :: () -> 'ok').
stop() ->
rabbit_log:info("stop(_) ->"),
ok = rabbit_sup:stop_child(?MSG_STORE),
rabbit_log:info(" -> ok"),
ok.
%%----------------------------------------------------------------------------
%% init/3 creates one backing queue, returning its state. Names are
%% local to the vhost, and need not be unique.
%%
%% -spec(init/3 ::
%% (rabbit_amqqueue:name(), is_durable(), attempt_recovery())
%% -> state()).
%%
%% This function should be called only from outside this module.
%% BUG: Need to provide better back-pressure when queue is filling up.
init(QueueName, _IsDurable, _Recover) ->
rabbit_log:info("init(~p, _, _) ->", [QueueName]),
Result = #s { next_seq_id = 0,
pending_ack_dict = dict:new(),
txn_dict = dict:new(),
name = QueueName },
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% terminate/1 is called on queue shutdown when the queue isn't being
%% deleted.
%%
%% This function should be called only from outside this module.
%%
%% -spec(terminate/1 :: (state()) -> state()).
terminate(S) ->
Result = remove_acks_state(S),
rabbit_log:info("terminate(~p) ->", [S]),
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% delete_and_terminate/1 is called when the queue is terminating and
%% needs to delete all its content. The only difference between purge
%% and delete is that delete also needs to delete everything that's
%% been delivered and not ack'd.
%%
%% This function should be called only from outside this module.
%%
%% -spec(delete_and_terminate/1 :: (state()) -> state()).
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(S) ->
rabbit_log:info("delete_and_terminate(~p) ->", [S]),
%Result = remove_acks_state(S #s { q = queue:new() }),
%rabbit_log:info(" -> ~p", [Result]),
S.
%%----------------------------------------------------------------------------
%% purge/1 removes all messages in the queue, but not messages which
%% have been fetched and are pending acks.
%%
%% This function should be called only from outside this module.
%%
%% -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
purge(S) ->
rabbit_log:info("purge(~p) ->", [S]),
%Result = {queue:len(Q), S #s { q = queue:new() }},
%rabbit_log:info(" -> ~p", [Result]),
{0, S}.
%%----------------------------------------------------------------------------
%% publish/3 publishes a message.
%%
%% This function should be called only from outside this module. All
%% msgs are silently reated as non-persistent.
%%
%% -spec(publish/3 ::
%% (rabbit_types:basic_message(),
%% rabbit_types:message_properties(),
%% state())
%% -> state()).
publish(Msg, Props, S) ->
rabbit_log:info("publish("),
rabbit_log:info(" ~p,", [Msg]),
rabbit_log:info(" ~p,", [Props]),
rabbit_log:info(" ~p) ->", [S]),
Result = publish_state(Msg, Props, false, S),
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% publish_delivered/4 is called for messages which have already been
%% passed straight out to a client. The queue will be empty for these
%% calls (i.e. saves the round trip through the backing queue). All
%% msgs are silently treated as non-persistent.
%%
%% This function should be called only from outside this module.
%%
%% -spec(publish_delivered/4 ::
%% (ack_required(),
%% rabbit_types:basic_message(),
%% rabbit_types:message_properties(),
%% state())
%% -> {ack(), state()}).
publish_delivered(false, _, _, S) ->
rabbit_log:info("publish_delivered(false, _, _,"),
rabbit_log:info(" ~p) ->", [S]),
Result = {blank_ack, S},
rabbit_log:info(" -> ~p", [Result]),
Result;
publish_delivered(true, Msg, Props, S = #s { next_seq_id = SeqId }) ->
rabbit_log:info("publish_delivered(true, "),
rabbit_log:info(" ~p,", [Msg]),
rabbit_log:info(" ~p,", [Props]),
rabbit_log:info(" ~p) ->", [S]),
Result =
{SeqId,
(record_pending_ack_state(
((m(Msg, SeqId, Props)) #m { is_delivered = true }), S))
#s { next_seq_id = SeqId + 1 }},
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% dropwhile/2 drops messages from the head of the queue while the
%% supplied predicate returns true.
%%
%% This function should be called only from outside this module.
%%
%% -spec(dropwhile/2 ::
%% (fun ((rabbit_types:message_properties()) -> boolean()), state())
%% -> state()).
dropwhile(Pred, S) ->
rabbit_log:info("dropwhile(~p, ~p) ->", [Pred, S]),
{_, S1} = dropwhile_state(Pred, S),
Result = S1,
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% fetch/2 produces the next message.
%%
%% This function should be called only from outside this module.
%%
%% -spec(fetch/2 :: (ack_required(), state()) ->
%% {ok | fetch_result(), state()}).
fetch(AckRequired, S = #s { name = QueueName }) ->
rabbit_log:info("fetch(~p, ~p) ->", [AckRequired, S]),
{Msg, SeqId} = riak_msg_store:pop(?MSG_STORE, QueueName, AckRequired),
rabbit_log:info(" fetch() -> ~p", [Msg]),
Len = riak_msg_store:len(?MSG_STORE, QueueName),
{Ack, S1} =
case AckRequired of
true ->
{SeqId, record_pending_ack_state(
(m(Msg, SeqId, #message_properties{expiry = undefined, needs_confirming = false})) #m {
is_delivered = true }, S)};
false -> {blank_ack, S}
end,
{{Msg, true, Ack, Len}, S1}.
-spec internal_queue_out(fun ((m(), state()) -> T), state()) ->
{empty, state()} | T.
internal_queue_out(F, S = #s { q = Q }) ->
case queue:out(Q) of
{empty, _} -> {empty, S};
{{value, M}, Qa} -> F(M, S #s { q = Qa })
end.
-spec internal_fetch/3 :: (ack_required(), m(), s()) -> {fetch_result(), s()}.
internal_fetch(AckRequired,
M = #m {
seq_id = SeqId,
msg = Msg,
is_delivered = IsDelivered },
S = #s { q = Q }) ->
{Ack, S1} =
case AckRequired of
true ->
{SeqId,
record_pending_ack_state(
M #m { is_delivered = true }, S)};
false -> {blank_ack, S}
end,
{{Msg, IsDelivered, Ack, queue:len(Q)}, S1}.
%%----------------------------------------------------------------------------
%% ack/2 acknowledges messages names by SeqIds. Maps SeqIds to guids
%% upon return.
%%
%% This function should be called only from outside this module.
%%
%% The following spec is wrong, as a blank_ack cannot be passed back in.
%%
%% -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
ack(SeqIds, S) ->
rabbit_log:info("ack("),
rabbit_log:info("~p,", [SeqIds]),
rabbit_log:info(" ~p) ->", [S]),
{Guids, S1} = internal_ack(SeqIds, S),
Result = {Guids, S1},
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% tx_publish/4 is a publish, but in the context of a transaction. It
%% stores the message and its properties in the to_pub field of the txn,
%% waiting to be committed.
%%
%% This function should be called only from outside this module.
%%
%% -spec(tx_publish/4 ::
%% (rabbit_types:txn(),
%% rabbit_types:basic_message(),
%% rabbit_types:message_properties(),
%% state())
%% -> state()).
tx_publish(Txn, Msg, Props, S) ->
rabbit_log:info("tx_publish(~p, ~p, ~p, ~p) ->", [Txn, Msg, Props, S]),
Tx = #tx { to_pub = Pubs } = lookup_tx(Txn, S),
Result = store_tx(Txn, Tx #tx { to_pub = [{Msg, Props} | Pubs] }, S),
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% tx_ack/3 acks, but in the context of a transaction. It stores the
%% seq_id in the acks field of the txn, waiting to be committed.
%%
%% This function should be called only from outside this module.
%%
%% The following spec is wrong, as a blank_ack cannot be passed back in.
%%
%% -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
tx_ack(Txn, SeqIds, S) ->
rabbit_log:info("tx_ack(~p, ~p, ~p) ->", [Txn, SeqIds, S]),
Tx = #tx { to_ack = SeqIds0 } = lookup_tx(Txn, S),
Result =
store_tx(Txn, Tx #tx { to_ack = lists:append(SeqIds, SeqIds0) }, S),
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% tx_rollback/2 undoes anything which has been done in the context of
%% the specified transaction. It returns the state with to_pub and
%% to_ack erased.
%%
%% This function should be called only from outside this module.
%%
%% The following spec is wrong, as a blank_ack cannot be passed back in.
%%
%% -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
tx_rollback(Txn, S) ->
rabbit_log:info("tx_rollback(~p, ~p) ->", [Txn, S]),
#tx { to_ack = SeqIds } = lookup_tx(Txn, S),
Result = {SeqIds, erase_tx(Txn, S)},
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% tx_commit/4 commits a transaction. The F passed in must be called
%% once the messages have really been commited. This CPS permits the
%% possibility of commit coalescing.
%%
%% This function should be called only from outside this module.
%%
%% The following spec is wrong, blank_acks cannot be returned.
%%
%% -spec(tx_commit/4 ::
%% (rabbit_types:txn(),
%% fun (() -> any()),
%% message_properties_transformer(),
%% state())
%% -> {[ack()], state()}).
tx_commit(Txn, F, PropsF, S) ->
rabbit_log:info(
"tx_commit(~p, ~p, ~p, ~p) ->", [Txn, F, PropsF, S]),
#tx { to_ack = SeqIds, to_pub = Pubs } = lookup_tx(Txn, S),
Result = {SeqIds, tx_commit_state(Pubs, SeqIds, PropsF, erase_tx(Txn, S))},
F(),
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% requeue/3 reinserts messages into the queue which have already been
%% delivered and were pending acknowledgement.
%%
%% This function should be called only from outside this module.
%%
%% The following spec is wrong, as blank_acks cannot be passed back in.
%%
%% -spec(requeue/3 ::
%% ([ack()], message_properties_transformer(), state()) -> state()).
requeue(SeqIds, PropsF, S) ->
rabbit_log:info("requeue(~p, ~p, ~p) ->", [SeqIds, PropsF, S]),
{_, S1} =
internal_ack3(
fun (#m { msg = Msg, props = Props }, Si) ->
publish_state(Msg, PropsF(Props), true, Si)
end,
SeqIds,
S),
Result = S1,
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% len/1 returns the queue length.
%%
%% -spec(len/1 :: (state()) -> non_neg_integer()).
len(#s { name = QueueName }) ->
% rabbit_log:info("len(~p) ->", [Q]),
Result = riak_msg_store:len(?MSG_STORE, QueueName),
rabbit_log:info(" len() -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% is_empty/1 returns 'true' if the queue is empty, and 'false'
%% otherwise.
%%
%% -spec(is_empty/1 :: (state()) -> boolean()).
is_empty(#s { name = QueueName }) ->
% rabbit_log:info("is_empty(~p)", [Q]),
0 == riak_msg_store:len(?MSG_STORE, QueueName).
% rabbit_log:info(" -> ~p", [Result]),
% Result.
%%----------------------------------------------------------------------------
%% For the next two functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
%% queue. The RAM duration is thus the length of time represented by
%% the messages held in RAM given the current rates. If you want to
%% ignore all of this stuff, then do so, and return 0 in
%% ram_duration/1.
%% set_ram_duration_target states that the target is to have no more
%% messages in RAM than indicated by the duration and the current
%% queue rates.
%%
%% This function should be called only from outside this module.
%%
%% -spec(set_ram_duration_target/2 ::
%% (('undefined' | 'infinity' | number()), state())
%% -> state()).
set_ram_duration_target(_, S) ->
rabbit_log:info("set_ram_duration_target(_~p) ->", [S]),
Result = S,
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% ram_duration/1 optionally recalculates the duration internally
%% (likely to be just update your internal rates), and report how many
%% seconds the messages in RAM represent given the current rates of
%% the queue.
%%
%% This function should be called only from outside this module.
%%
%% -spec(ram_duration/1 :: (state()) -> {number(), state()}).
ram_duration(S) ->
rabbit_log:info("ram_duration(~p) ->", [S]),
Result = {0, S},
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% needs_idle_timeout/1 returns 'true' if 'idle_timeout' should be
%% called as soon as the queue process can manage (either on an empty
%% mailbox, or when a timer fires), and 'false' otherwise.
%%
%% This function should be called only from outside this module.
%%
%% -spec(needs_idle_timeout/1 :: (state()) -> boolean()).
needs_idle_timeout(_) ->
rabbit_log:info("needs_idle_timeout(_) ->"),
Result = false,
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% idle_timeout/1 is called (eventually) after needs_idle_timeout returns
%% 'true'. Note this may be called more than once for each 'true'
%% returned from needs_idle_timeout.
%%
%% This function should be called only from outside this module.
%%
%% -spec(idle_timeout/1 :: (state()) -> state()).
idle_timeout(S) ->
rabbit_log:info("idle_timeout(~p) ->", [S]),
Result = S,
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% handle_pre_hibernate/1 is called immediately before the queue
%% hibernates.
%%
%% This function should be called only from outside this module.
%%
%% -spec(handle_pre_hibernate/1 :: (state()) -> state()).
handle_pre_hibernate(S) ->
Result = S,
rabbit_log:info("handle_pre_hibernate(~p) ->", [S]),
rabbit_log:info(" -> ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% status/1 exists for debugging purposes, to be able to expose state
%% via rabbitmqctl list_queues backing_queue_status
%%
%% This function should be called only from outside this module.
%%
%% -spec(status/1 :: (state()) -> [{atom(), any()}]).
status(#s { q = Q,
next_seq_id = NextSeqId,
pending_ack_dict = PAD }) ->
rabbit_log:info("status(_) ->"),
Result = [{len, queue:len(Q)},
{next_seq_id, NextSeqId},
{acks, dict:size(PAD)}],
rabbit_log:info(" ~p", [Result]),
Result.
%%----------------------------------------------------------------------------
%% Various helpers
%%----------------------------------------------------------------------------
-spec(dropwhile_state/2 ::
(fun ((rabbit_types:message_properties()) -> boolean()), s())
-> {empty | ok, s()}).
dropwhile_state(Pred, S) ->
internal_queue_out(
fun (M = #m { props = Props }, Si = #s { q = Q }) ->
case Pred(Props) of
true ->
{_, Si1} = internal_fetch(false, M, Si),
dropwhile_state(Pred, Si1);
false -> {ok, Si #s {q = queue:in_r(M, Q) }}
end
end,
S).
-spec(internal_ack/2 :: ([seq_id()], s()) -> {[rabbit_guid:guid()], s()}).
internal_ack(SeqIds, S) ->
internal_ack3(fun (_, Si) -> Si end, SeqIds, S).
-spec tx_commit_state([rabbit_types:basic_message()],
[seq_id()],
message_properties_transformer(),
s()) ->
s().
tx_commit_state(Pubs, SeqIds, PropsF, S) ->
{_, S1} = internal_ack(SeqIds, S),
lists:foldl(
fun ({Msg, Props}, Si) -> publish_state(Msg, Props, false, Si) end,
S1,
[{Msg, PropsF(Props)} || {Msg, Props} <- lists:reverse(Pubs)]).
-spec m(rabbit_types:basic_message(),
seq_id(),
rabbit_types:message_properties()) ->
m().
m(Msg, SeqId, Props) ->
#m { seq_id = SeqId, msg = Msg, props = Props, is_delivered = false }.
-spec lookup_tx(rabbit_types:txn(), state()) -> tx().
lookup_tx(Txn, #s { txn_dict = TxnDict }) ->
case dict:find(Txn, TxnDict) of
error -> #tx { to_pub = [], to_ack = [] };
{ok, Tx} -> Tx
end.
-spec store_tx(rabbit_types:txn(), tx(), state()) -> state().
store_tx(Txn, Tx, S = #s { txn_dict = TxnDict }) ->
S #s { txn_dict = dict:store(Txn, Tx, TxnDict) }.
-spec erase_tx(rabbit_types:txn(), state()) -> state().
erase_tx(Txn, S = #s { txn_dict = TxnDict }) ->
S #s { txn_dict = dict:erase(Txn, TxnDict) }.
-spec publish_state(rabbit_types:basic_message(),
rabbit_types:message_properties(),
boolean(),
s()) -> s().
publish_state(Msg,
Props,
IsDelivered,
S = #s { next_seq_id = SeqId, name = QueueName }) ->
case IsDelivered of
false -> ok = riak_msg_store:push(?MSG_STORE, QueueName, SeqId, Msg, Props)
end,
S #s { next_seq_id = SeqId + 1 }.
-spec record_pending_ack_state(m(), s()) -> s().
record_pending_ack_state(M = #m { seq_id = SeqId },
S = #s { pending_ack_dict = PAD }) ->
S #s { pending_ack_dict = dict:store(SeqId, M, PAD) }.
% -spec remove_acks_state(s()) -> s().
remove_acks_state(S = #s { pending_ack_dict = PAD }) ->
_ = dict:fold(fun (_, M, Acc) -> [m_guid(M) | Acc] end, [], PAD),
S #s { pending_ack_dict = dict:new() }.
-spec internal_ack3(fun (([rabbit_guid:guid()], s()) -> s()),
[rabbit_guid:guid()],
s()) -> {[rabbit_guid:guid()], s()}.
internal_ack3(_, [], S) -> {[], S};
internal_ack3(F, SeqIds, S) ->
{AllGuids, S1} =
lists:foldl(
fun (SeqId, {Acc, Si = #s { pending_ack_dict = PAD }}) ->
M = dict:fetch(SeqId, PAD),
{[m_guid(M) | Acc],
F(M, Si #s { pending_ack_dict = dict:erase(SeqId, PAD)})}
end,
{[], S},
SeqIds),
{lists:reverse(AllGuids), S1}.
-spec m_guid(m()) -> rabbit_guid:guid().
m_guid(#m { msg = #basic_message { guid = Guid }}) -> Guid.
=ERROR REPORT==== 31-Jan-2011::08:00:01 ===
** Generic server <0.204.0> terminating
** Last message in was {notify_down,<0.223.0>}
** When Server state == {q,
{amqqueue,
{resource,<<"/">>,queue,<<"test">>},
false,false,none,[],<0.204.0>},
none,true,rabbit_riak_queue,
{s,undefined,2,
{dict,2,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],
[["0"|
{m,"0",
{basic_message,undefined,undefined,
{content,60,
{'P_basic',"application/json",undefined,
{dict,5,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[]},
{{[],
[["X-Riak-Meta-Bar",98,97,122]],
[],[],[],
[["X-Riak-Meta-Guid",52,82,56,50,56,43,
65,71,112,113,69,115,49,120,87,116,57,
57,43,120,97,103,61,61]],
[],[],[],
[["X-Riak-Meta-Queue",116,101,115,116]],
[],[],
[["X-Riak-Meta-Foo",55]],
[],[],
[["X-Riak-Meta-Classid",54,48]]}}},
undefined,undefined,undefined,undefined,
undefined,undefined,undefined,undefined,
undefined,undefined,undefined},
undefined,undefined,
[<<"{\"test\":\"value\", \"int\": 14}">>]},
<<225,31,54,243,224,6,166,161,44,215,21,173,
247,223,177,106>>,
undefined},
{message_properties,undefined,false},
true}]],
[],[],[],[],
[["1"|
{m,"1",
{basic_message,undefined,undefined,
{content,60,
{'P_basic',"application/json",undefined,
{dict,5,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[],[]},
{{[],
[["X-Riak-Meta-Bar",98,97,122]],
[],[],[],
[["X-Riak-Meta-Guid",117,75,101,54,108,
87,72,108,68,77,108,48,106,79,54,79,
71,54,71,111,122,65,61,61]],
[],[],[],
[["X-Riak-Meta-Queue",116,101,115,116]],
[],[],
[["X-Riak-Meta-Foo",55]],
[],[],
[["X-Riak-Meta-Classid",54,48]]}}},
undefined,undefined,undefined,undefined,
undefined,undefined,undefined,undefined,
undefined,undefined,undefined},
undefined,undefined,
[<<"{\"test\":\"value\", \"int\": 14}">>]},
<<184,167,186,149,97,229,12,201,116,140,238,
142,27,161,168,204>>,
undefined},
{message_properties,undefined,false},
true}]],
[],[],[]}}},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]}}},
{resource,<<"/">>,queue,<<"test">>}},
{[{<0.223.0>,
{consumer,
<<"amq.ctag-Dhi6u7GHGYVwkIMH3/ctDg==">>,true}}],
[]},
{[],[]},
undefined,undefined,
{1296482404018690,#Ref<0.0.0.665>},
undefined,
{state,none,undefined},
undefined,undefined}
** Reason for termination ==
** {{badrecord,message_properties},
[{rabbit_amqqueue_process,'-reset_msg_expiry_fun/1-fun-0-',2},
{rabbit_riak_queue,'-requeue/3-fun-0-',3},
{rabbit_riak_queue,'-internal_ack3/3-fun-0-',3},
{lists,foldl,3},
{rabbit_riak_queue,internal_ack3,3},
{rabbit_riak_queue,requeue,3},
{rabbit_amqqueue_process,maybe_run_queue_via_backing_queue,2},
{rabbit_amqqueue_process,handle_ch_down,2}]}
=ERROR REPORT==== 31-Jan-2011::08:00:01 ===
** Generic server <0.223.0> terminating
** Last message in was {'$gen_cast',
{deliver,<<"amq.ctag-Dhi6u7GHGYVwkIMH3/ctDg==">>,
true,
{{resource,<<"/">>,queue,<<"test">>},
<0.204.0>,"0",true,
{basic_message,undefined,undefined,
{content,60,
{'P_basic',"application/json",undefined,
{dict,5,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]},
{{[],
[["X-Riak-Meta-Bar",98,97,122]],
[],[],[],
[["X-Riak-Meta-Guid",52,82,56,50,56,43,65,71,
112,113,69,115,49,120,87,116,57,57,43,120,
97,103,61,61]],
[],[],[],
[["X-Riak-Meta-Queue",116,101,115,116]],
[],[],
[["X-Riak-Meta-Foo",55]],
[],[],
[["X-Riak-Meta-Classid",54,48]]}}},
undefined,undefined,undefined,undefined,
undefined,undefined,undefined,undefined,
undefined,undefined,undefined},
undefined,undefined,
[<<"{\"test\":\"value\", \"int\": 14}">>]},
<<225,31,54,243,224,6,166,161,44,215,21,173,247,223,
177,106>>,
undefined}}}}
** When Server state == {ch,running,1,<0.220.0>,<0.222.0>,undefined,
#Fun<rabbit_channel_sup.0.58069862>,none,
{set,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
1,
{[],[]},
{[],[]},
<<"guest">>,<<"/">>,<<"test">>,
{dict,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],
[[<<"amq.ctag-Dhi6u7GHGYVwkIMH3/ctDg==">>|
{resource,<<"/">>,queue,<<"test">>}]],
[],[],[],[],[],[],[],[],[]}}},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
<0.218.0>,
{state,none,undefined}}
** Reason for termination ==
** {{badrecord,resource},
[{rabbit_channel,internal_deliver,5},
{rabbit_channel,handle_cast,2},
{gen_server2,handle_msg,2},
{proc_lib,init_p_do_apply,3}]}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment