Skip to content

Instantly share code, notes, and snippets.

@benmmurphy
Created February 19, 2018 15:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save benmmurphy/66e2498ba27ca9a46fd29e89f43402b6 to your computer and use it in GitHub Desktop.
Save benmmurphy/66e2498ba27ca9a46fd29e89f43402b6 to your computer and use it in GitHub Desktop.
collect_acks
-module(collect_acks_bench).
-export([bench_fifo/0, bench_lifo/0, bench_multiple/1]).
precondition_failed(S, _W) ->
throw(S).
%% NB: returns acks in youngest-first order
collect_acks(Q, 0, true) ->
{lists:reverse(queue:to_list(Q)), queue:new()};
collect_acks(Q, DeliveryTag, Multiple) ->
collect_acks([], [], Q, DeliveryTag, Multiple).
collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
case queue:out(Q) of
{{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}},
QTail} ->
if CurrentDeliveryTag == DeliveryTag ->
{[UnackedMsg | ToAcc],
case PrefixAcc of
[] -> QTail;
_ -> queue:join(
queue:from_list(lists:reverse(PrefixAcc)),
QTail)
end};
Multiple ->
collect_acks([UnackedMsg | ToAcc], PrefixAcc,
QTail, DeliveryTag, Multiple);
true ->
collect_acks(ToAcc, [UnackedMsg | PrefixAcc],
QTail, DeliveryTag, Multiple)
end;
{empty, _} ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag])
end.
repeat(0, F) ->
ok;
repeat(N, F) ->
F(),
repeat(N - 1, F).
bench_fifo() ->
Acks = lists:seq(1, 8000),
Q = lists:foldl(fun(E, Acc) ->
queue:in({E, ctag, msg}, Acc)
end, queue:new(), Acks),
timer:tc(fun() ->
repeat(1000, fun() ->
lists:foldl(fun(E, Acc) ->
{_Result, NewQueue} = collect_acks(Acc, E, false),
NewQueue
end, Q, Acks)
end)
end).
bench_multiple(N) ->
Acks = lists:seq(1, 8000),
Q = lists:foldl(fun(E, Acc) ->
queue:in({E, ctag, msg}, Acc)
end, queue:new(), Acks),
timer:tc(fun() ->
repeat(1000, fun() ->
collect_acks(Q, N, true)
end)
end).
bench_lifo() ->
Acks = lists:seq(1, 8000),
Q = lists:foldl(fun(E, Acc) ->
queue:in({E, ctag, msg}, Acc)
end, queue:new(), Acks),
lists:foldl(fun(E, Acc) ->
{Result, NewQueue} = collect_acks(Acc, E, false),
NewQueue
end, Q, lists:reverse(Acks)).
-module(collect_acks_bench_new).
-export([bench_fifo/0, bench_lifo/0, bench_multiple/1]).
precondition_failed(S, _W) ->
throw(S).
%% TODO: implement DeliveryTag==0 fast path?
collect_acks(Q, DeliveryTag, true) ->
collect_acks_multiple([], Q, DeliveryTag);
collect_acks(Q, DeliveryTag, false) ->
case take_any(DeliveryTag, Q) of
error ->
precondition_failed("unknown delivery tag ~w", [DeliveryTag]);
{UnackedMessage, Q1} ->
{[UnackedMessage], Q1}
end.
%% reimplementation of gb_trees:take_any that is on erlang R20+
take_any(Key, Tree) ->
case gb_trees:is_defined(Key, Tree) of
true ->
Value = gb_trees:get(Key, Tree),
Tree1 = gb_trees:delete(Key, Tree),
{Value, Tree1};
false ->
error
end.
collect_acks_multiple(ToAcc, Q, DeliveryTag) ->
% if the tag does not exist we still ack tags < than the specified tag
case gb_trees:is_empty(Q) of
true ->
{ToAcc, Q};
false ->
{NextDeliveryTag, NextUnackedMessage, Q1} = gb_trees:take_smallest(Q),
case NextDeliveryTag =< DeliveryTag of
true ->
collect_acks_multiple([NextUnackedMessage | ToAcc], Q1, DeliveryTag);
false ->
{ToAcc, Q}
end
end.
repeat(0, F) ->
ok;
repeat(N, F) ->
F(),
repeat(N - 1, F).
bench_fifo() ->
Acks = lists:seq(1, 8000),
Q = lists:foldl(fun(E, Acc) ->
gb_trees:insert(E, {E, ctag, msg}, Acc)
end, gb_trees:empty(), Acks),
timer:tc(fun() ->
repeat(1000, fun() ->
lists:foldl(fun(E, Acc) ->
{_Result, NewQueue} = collect_acks(Acc, E, false),
NewQueue
end, Q, Acks)
end)
end).
bench_multiple(N) ->
Acks = lists:seq(1, 8000),
Q = lists:foldl(fun(E, Acc) ->
gb_trees:insert(E, {E, ctag, msg}, Acc)
end, gb_trees:empty(), Acks),
timer:tc(fun() ->
repeat(1000, fun() ->
{_Result, NewQueue} = collect_acks(Q, N, true)
end)
end).
bench_lifo() ->
Acks = lists:seq(1, 8000),
Q = lists:foldl(fun(E, Acc) ->
gb_trees:insert(E, {E, ctag, msg}, Acc)
end, gb_trees:empty(), Acks),
lists:foldl(fun(E, Acc) ->
{Result, NewQueue} = collect_acks(Acc, E, false),
NewQueue
end, Q, lists:reverse(Acks)).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment