Skip to content

Instantly share code, notes, and snippets.

@hairyhum
Last active May 8, 2019 14:44
Show Gist options
  • Save hairyhum/523e07932d1f82004138fa7bb1c76290 to your computer and use it in GitHub Desktop.
Save hairyhum/523e07932d1f82004138fa7bb1c76290 to your computer and use it in GitHub Desktop.
Simple perf test for rabbitmq schema storage.
-module(schema_test).
-export([declare_queues/3, add_binding/3]).
-include_lib("amqp_client/include/amqp_client.hrl").
% -include_lib("rabbit_common/include/rabbit.hrl").
-define(QNAME, <<"foo">>).
-define(XNAME, <<"foo">>).
%% This module contains functions, which create RabbitMQ entities in parallel
%% processes. Can be used to measure schema database performance.
%% This function declares queues and binds them to exchange "foo"
%% PerProc - number of queues, sequentially created by each process
%% NProcs - number of parallel processes
%% Tag - a suffix to add to queue names. If run twice with the same tag -
%% new queues will not be created.
declare_queues(PerProc, NProcs, Tag) ->
Self = self(),
Pids = [spawn_link(fun() -> do_declare_queues(PerProc, N, Self, Tag) end)
|| N <- lists:seq(1, NProcs)],
collect_responses(Pids).
do_declare_queues(PerProc, ProcN, Pid, Tag) ->
Exchange = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"foo">>},
Queue = #resource{virtual_host = <<"/">>, kind = queue},
[begin
Name = <<"QUEUE_", (integer_to_binary(ProcN))/binary, "_", (integer_to_binary(N))/binary, Tag/binary>>,
rabbit_amqqueue:declare(Queue#resource{name = Name}, true, false, [], none, <<"none">>),
rabbit_binding:add(#binding{source = Exchange, destination = Queue#resource{name = Name}, key = <<"key">>}, <<"none">>)
end ||
N <- lists:seq(1, PerProc)],
Pid ! {finished, self()}.
%% This function creates bindings between queue "foo" and exchange "foo"
%% PerProc - number of bindings, sequentially created by each process
%% NProcs - number of parallel processes
%% Tag - a suffix to add to routing keys. If run twice with the same tag -
%% new bindings will not be created.
add_bindings(PerProc, NProcs, Tag) ->
Self = self(),
Pids = [spawn_link(fun() -> do_add_bindings(PerProc, N, Self, Tag) end)
|| N <- lists:seq(1, NProcs)],
collect_responses(Pids).
do_add_bindings(PerProc, ProcN, Pid, Tag) ->
Resource = #resource{virtual_host = <<"/">>, name = <<"foo">>},
[begin
RoutingKey = <<"RK_", (integer_to_binary(ProcN))/binary, "_", (integer_to_binary(N))/binary, Tag/binary>>,
Binging = #binding{source = Resource#resource{kind = exchange},
destination = Resource#resource{kind = queue},
key = RoutingKey},
rabbit_binding:add(Binging, <<"none">>)
end ||
N <- lists:seq(1, PerProc)],
Pid ! {finished, self()}.
collect_responses(Pids) -> collect_responses(Pids, []).
collect_responses([], Processed) ->
Processed;
collect_responses(Pids, Processed) ->
receive {finished, Pid} -> collect_responses(Pids -- [Pid], [Pid | Processed])
after 500000 -> error(timeout)
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment