Skip to content

Instantly share code, notes, and snippets.

@shino
Last active August 29, 2015 14:09
Show Gist options
  • Save shino/f30eb66d8a53b8d71224 to your computer and use it in GitHub Desktop.
Save shino/f30eb66d8a53b8d71224 to your computer and use it in GitHub Desktop.
Handoff and siblings

Riak repository

Remote:   1.4 @ origin (git://github.com/basho/riak.git)
Local:    1.4 ~/b/g/riak-1.4-handoff-siblings/
Head:     afdb937 Finalize release notes for 1.4.10

Config changes

	Modified   rel/files/app.config
diff --git a/rel/files/app.config b/rel/files/app.config
index a646e78..70ebc18 100644
--- a/rel/files/app.config
+++ b/rel/files/app.config
@@ -17,12 +17,15 @@
 
  %% Riak Core config
  {riak_core, [
+              {default_bucket_props, [{allow_mult, true}]},
+              {handoff_concurrency, 1},
+
               %% Default location of ringstate
               {ring_state_dir, "{{ring_state_dir}}"},
 
               %% Default ring creation size.  Make sure it is a power of 2,
               %% e.g. 16, 32, 64, 128, 256, 512 etc
-              %{ring_creation_size, 64},
+              {ring_creation_size, 16},
 
               %% http is a list of IP addresses and TCP ports that the Riak
               %% HTTP interface will bind.
@@ -78,6 +81,8 @@
 
  %% Riak KV config
  {riak_kv, [
+            {warn_siblings, 1},
+
             %% Storage_backend specifies the Erlang module defining the storage
             %% mechanism that will be used on this node.
             {storage_backend, riak_kv_bitcask_backend},
@@ -90,7 +95,7 @@
             %% Enable active anti-entropy subsystem + optional debug messages:
             %%   {anti_entropy, {on|off, []}},
             %%   {anti_entropy, {on|off, [debug]}},
-            {anti_entropy, {on, []}},
+            {anti_entropy, {off, []}},
 
             %% Restrict how fast AAE can build hash trees. Building the tree
             %% for a given partition requires a full scan over that partition's

Evil sleep in riak_core

Local:    (detached) ~/b/g/riak-1.4-handoff-siblings/deps/riak_core/
Head:     c769123 Roll version riak_core 1.4.10

Changes:
	Modified   src/riak_core_handoff_sender.erl
diff --git a/src/riak_core_handoff_sender.erl b/src/riak_core_handoff_sender.erl
index 347563b..0c8067f 100644
--- a/src/riak_core_handoff_sender.erl
+++ b/src/riak_core_handoff_sender.erl
@@ -307,6 +307,8 @@ visit_item(K, V, Acc = #ho_acc{ack = _AccSyncThreshold, acksync_threshold = _Acc
             Acc#ho_acc{ack=0, error={error, Reason}, stats=Stats3}
     end;
 visit_item(K, V, Acc) ->
+    %% Emulate slooooow transfer and/or large data set to send/recv
+    timer:sleep(10000),
     #ho_acc{filter=Filter,
             module=Module,
             total_objects=TotalObjects,

Setup 2-node cluster.

% DEVNODES=3 make stagedevrel
% for i in {1..3}; do dev/dev${i}/bin/riak start & ; done
% dev/dev2/bin/riak-admin cluster join dev1@127.0.0.1
% dev/dev1/bin/riak-admin cluster plan
% dev/dev1/bin/riak-admin cluster commit

Put some data. The script handoff_and_siblings.erl is included in this gist. This example shows execution at riak_cs cloned/built directory, but it only requires riak-erlang-client and its dependincies.

% ERL_LIBS=deps escript /path/to/handoff_and_siblings.erl

Add one more node to the cluster.

% dev/dev3/bin/riak-admin cluster join dev1@127.0.0.1
% dev/dev1/bin/riak-admin cluster plan
% dev/dev1/bin/riak-admin cluster commit

Put some more data.

% ERL_LIBS=deps escript /path/to/handoff_and_siblings.erl

Then look for many siblings log.

% grep 'many siblings' dev/dev3/log/console.log
-module(handoff_and_siblings).
-compile([export_all]).
-define(BUCKET, <<"b">>).
-define(KEY_PREFIX, <<"k">>).
main(Args) ->
Host = {127,0,0,1},
PortBase = 10007,
{ok, Riakc1} = riakc_pb_socket:start_link(Host, PortBase + 10),
{ok, Riakc2} = riakc_pb_socket:start_link(Host, PortBase + 20),
case Args of
_ ->
start(Riakc1, Riakc2, ?BUCKET, ?KEY_PREFIX)
end,
ok.
start(Riakc1, Riakc2, Bucket, KeyPrefix) ->
KeyCount = 100,
concurrent_update(Riakc1, Riakc2, Bucket, KeyPrefix, KeyCount),
single_process_updates(Riakc1, Bucket, KeyPrefix, KeyCount),
ok.
%% Execute just single pair of interleaved read-modify-write's
%% client1: get put
%% client2: get put
concurrent_update(_Riakc1, _Riakc2, _Bucket, _KeyPrefix, 0) ->
ok;
concurrent_update(Riakc1, Riakc2, Bucket, KeyPrefix, KeyCount) ->
Key = list_to_binary(io_lib:format("~s-~B", [KeyPrefix, KeyCount])),
initialize_obj(Riakc1, Bucket, Key),
{ok, Obj1} = riakc_pb_socket:get(Riakc1, Bucket, Key),
{ok, Obj2} = riakc_pb_socket:get(Riakc2, Bucket, Key),
riakc_pb_socket:put(Riakc1, update_obj(Obj1, 1), [{w, 2}]),
riakc_pb_socket:put(Riakc2, update_obj(Obj2, -1), [{w, 2}]),
concurrent_update(Riakc1, Riakc2, Bucket, KeyPrefix, KeyCount - 1).
single_process_updates(_Riakc, _Bucket, _KeyPrefix, 0) ->
ok;
single_process_updates(Riakc, Bucket, KeyPrefix, KeyCount) ->
UpdatesPerKey = 15,
Key = list_to_binary(io_lib:format("~s-~B", [KeyPrefix, KeyCount])),
[begin
{ok, Obj} = riakc_pb_socket:get(Riakc, Bucket, Key),
riakc_pb_socket:put(Riakc, update_obj(Obj, I))
end || I <- lists:seq(1, UpdatesPerKey)],
single_process_updates(Riakc, Bucket, KeyPrefix, KeyCount - 1).
initialize_obj(Riakc, Bucket, Key) ->
InitialObj = case riakc_pb_socket:get(Riakc, Bucket, Key) of
{ok, Obj} ->
update_obj(Obj, 0);
{error, notfound} ->
riakc_obj:new(Bucket, Key, 0)
end,
ok = riakc_pb_socket:put(Riakc, InitialObj).
update_obj(Old, NewValue) ->
ValueUpdated = riakc_obj:update_value(Old, to_bin(NewValue)),
[FirstMD | _] = riakc_obj:get_metadatas(ValueUpdated),
riakc_obj:update_metadata(ValueUpdated, FirstMD).
to_bin(Bin) when is_binary(Bin) -> Bin;
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(Int) when is_integer(Int) -> to_bin(integer_to_list(Int)).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment