Skip to content

Instantly share code, notes, and snippets.

@Techmind
Created April 17, 2012 14:28
Show Gist options
  • Save Techmind/2406295 to your computer and use it in GitHub Desktop.
Save Techmind/2406295 to your computer and use it in GitHub Desktop.
2dem
%% -------------------------------------------------------------------
%%
%% basho_bench: Benchmarking Suite
%%
%% Copyright (c) 2009-2010 Basho Techonologies
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(basho_bench_driver_riakclient).
-export([new/1,
run/4, map/3, map2/3, reduce/2]).
-include("basho_bench.hrl").
-record(state, { client,
bucket,
replies, index_range, index_slice }).
%% ====================================================================
%% API
%% ====================================================================
new(Id) ->
%% Make sure the path is setup such that we can get at riak_client
case code:which(riak_client) of
non_existing ->
?FAIL_MSG("~s requires riak_client module to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
Nodes = basho_bench_config:get(riakclient_nodes),
Cookie = basho_bench_config:get(riakclient_cookie, 'riak'),
MyNode = basho_bench_config:get(riakclient_mynode, [basho_bench, longnames]),
Replies = basho_bench_config:get(riakclient_replies, 2),
Bucket = basho_bench_config:get(riakclient_bucket, <<"test">>),
IndexRange = basho_bench_config:get(riakclient_index_range, 300000),
IndexSlice = basho_bench_config:get(riakclient_index_slice, 4),
%% Try to spin up net_kernel
case net_kernel:start(MyNode) of
{ok, _} ->
?INFO("Net kernel started as ~p\n", [node()]);
{error, {already_started, _}} ->
ok;
{error, {{already_started, _}, _}} ->
ok;
{error, Reason} ->
?FAIL_MSG("Failed to start net_kernel for ~p: ~p\n", [?MODULE, Reason])
end,
%% Initialize cookie for each of the nodes
[true = erlang:set_cookie(N, Cookie) || N <- Nodes],
%% Try to ping each of the nodes
ping_each(Nodes),
{Module, Binary,File} = code:get_object_code(?MODULE),
lists:foreach(fun(Node) -> rpc:call(Node, code, purge, [Module]), rpc:call(Node, code, load_binary, [Module, File, Binary]) end, Nodes),
%% Choose the node using our ID as a modulus
TargetNode = lists:nth((Id rem length(Nodes)+1), Nodes),
?INFO("Using target node ~p for worker ~p\n", [TargetNode, Id]),
case riak:client_connect(TargetNode) of
{ok, Client} ->
{ok, #state { client = Client,
bucket = Bucket,
replies = Replies, index_range = IndexRange, index_slice = IndexSlice }};
{error, Reason2} ->
?FAIL_MSG("Failed get a riak:client_connect to ~p: ~p\n", [TargetNode, Reason2])
end.
run(get, KeyGen, _ValueGen, State) ->
Key = KeyGen(),
case (State#state.client):get(State#state.bucket, Key, State#state.replies) of
{ok, _} ->
{ok, State};
{error, notfound} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(put, KeyGen, ValueGen, State) ->
Value = ValueGen(),
Robj = riak_object:new(State#state.bucket, KeyGen(), ValueGen()),
IndexInt = element(1, Value),
IndexValue = <<IndexInt:32/big-unsigned-integer>>,
MetaData = dict:from_list([{<<"index">>, [{<<"index_bin">>, IndexValue}]}]),
%% Create the object...
Robj2 = riak_object:update_metadata(Robj, MetaData),
case (State#state.client):put(Robj2, State#state.replies) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(update, KeyGen, ValueGen, State) ->
Key = KeyGen(),
case (State#state.client):get(State#state.bucket, Key, State#state.replies) of
{ok, Robj} ->
Robj2 = riak_object:update_value(Robj, ValueGen()),
case (State#state.client):put(Robj2, State#state.replies) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
{error, notfound} ->
Robj = riak_object:new(State#state.bucket, Key, ValueGen()),
case (State#state.client):put(Robj, State#state.replies) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end
end;
run(mr_multiget, KeyGen, _ValueGen, State) ->
Keylist = make_keylist(State#state.bucket, KeyGen,
100),
case (State#state.client):mapred(Keylist,
[{map, {qfun, fun ?MODULE:map2/3}, none, true}
], 100000) of
{ok, _Result} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(range_2i, KeyGen, _ValueGen, State) ->
Start = make_int(KeyGen()),
End = Start + 100,
case (State#state.client):get_index(State#state.bucket, {range, <<"$key">>, int_to_bin(Start), int_to_bin(End)}, 100000) of
{ok, _Result} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(range_2i_index, KeyGen, _ValueGen, State) ->
Start = make_int(KeyGen()),
End = Start + State#state.index_slice,
case (State#state.client):get_index(State#state.bucket, {range, <<"index">>, int_to_bin(Start), int_to_bin(End)}, 100000) of
{ok, _Result} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(mr_group_erlang, KeyGen, _ValueGen, State) ->
Keylist = make_keylist(State#state.bucket, KeyGen,
100),
case (State#state.client):mapred(Keylist,
[{map, {qfun, fun ?MODULE:map/3}, none, false}
,{reduce, {qfun, fun(Gcounts, none) -> reduce(Gcounts, none) end}, none, true}
], 100000) of
{ok, _Result} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
run(delete, KeyGen, _ValueGen, State) ->
case (State#state.client):delete(State#state.bucket, KeyGen(), State#state.replies) of
ok ->
{ok, State};
{error, notfound} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
map2(Data, _A, _B) -> [Data].
map(Data, _A, _B) ->
case Data of {error, notfound} -> [];
_Else -> Object = riak_object:get_value(Data), [dict:from_list([{element(1,
if is_binary(Object) and bit_size(Object) > 128 -> binary_to_term(Object); true -> Object end), 1}])] end
.
reduce(Gcounts, none) ->
[
lists:foldl(fun(G, Acc) ->
dict:merge(fun(_, X, Y) -> X+Y end, G, Acc) end, dict:new(), Gcounts)]
.
make_keylist(_Bucket, _KeyGen, 0) ->
[];
make_keylist(Bucket, KeyGen, Count) ->
[{Bucket, KeyGen()}
|make_keylist(Bucket, KeyGen, Count-1)].
%% ====================================================================
%% Internal functions
%% ====================================================================
ping_each([]) ->
ok;
ping_each([Node | Rest]) ->
case net_adm:ping(Node) of
pong ->
ping_each(Rest);
pang ->
?FAIL_MSG("Failed to ping node ~p\n", [Node])
end.
make_int(Data) when is_binary(Data) ->
<<Int:32/big-unsigned-integer>> = Data,
Int
.
int_to_bin(Int) ->
<<Int:32/native>>
.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment