Skip to content

Instantly share code, notes, and snippets.

@cdahlqvist
Created June 29, 2013 09:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cdahlqvist/5890454 to your computer and use it in GitHub Desktop.
Save cdahlqvist/5890454 to your computer and use it in GitHub Desktop.
-module(robin_handoff).
-export([start/0,start/1,start/2,stop/0]).
-export([step/0]).
%% see if a handoff is in progress
active_count(Node) when is_atom(Node) ->
Receivers=rpc:call(Node,supervisor,count_children,[riak_core_handoff_receiver_sup]),
Senders=rpc:call(Node,supervisor,count_children,[riak_core_handoff_sender_sup]),
lager:log(debug,self(),"Check Active: ~p~n~p~n~p",[Node,Receivers,Senders]),
case is_list(Receivers) andalso proplists:is_defined(active,Receivers) andalso is_list(Senders) andalso proplists:is_defined(active,Senders) of
true ->
ActiveReceivers=proplists:get_value(active,Receivers),
ActiveSenders=proplists:get_value(active,Senders),
ActiveReceivers + ActiveSenders;
false ->
lager:log(info,self(),"invalid response from ~p",Node),
reload
end;
active_count([Node | Rest]) ->
active_count(Node) + active_count(Rest);
active_count([]) -> 0;
active_count(Node) ->
lager:log(info,self(),"Handoff rotation enountered invalid node: ~p.",[Node]),
0.
%% advance to next pair
loop(Delay,[{Node1,Node2}|Rest], NodeList,Concurrency) ->
receive
tick ->
%% disable any new handoffs starting
rpc:multicall(NodeList,application,set_env,[riak_core,disable_inbound_handoff,true]),
rpc:multicall(NodeList,application,set_env,[riak_core,disable_outbound_handoff,true]),
case active_count(NodeList) of
0 ->
%% if no handoffs active, move on to the next pair
rpc:multicall(NodeList,riak_core_handoff_manager,set_concurrency,[0]),
rpc:multicall([Node1,Node2],application,set_env,[riak_core,disable_inbound_handoff,false]),
rpc:multicall([Node1,Node2],application,set_env,[riak_core,disable_outbound_handoff,false]),
rpc:multicall([Node1,Node2],riak_core_handoff_manager,set_concurrency,[Concurrency]),
lager:log(info,self(),"handoff_concurrency changed: ~p",[[{Node,rpc:call(Node,application,get_env,[riak_core,handoff_concurrency])} || Node <- NodeList]]),
erlang:send_after(Delay,self(),tick),
loop(Delay,Rest ++ [{Node1,Node2}],NodeList,Concurrency);
reload ->
lager:log(info,self(),"Reloading member list"),
{ok,Ring} = riak_core_ring_manager:get_my_ring(),
Members = riak_core_ring:all_members(Ring),
case length(Members) >= 2 of
true ->
{Left,Right} = lists:splitwith(fun(A) -> A =/= {Node1,Node2} end, buildpairs(Members)),
%%immediate tick to try again
erlang:send_after(1000,self(),tick),
loop(Delay, Right ++ Left, Members, Concurrency);
false ->
lager:log(info,self(),"Stopping round-robin handoff: Need at least 2 cluster members to handoff")
end;
_ ->
%% don't interrupt handoff in progess, wait 1 minute
erlang:send_after(60000,self(),tick)
end;
_ ->
lager:log(info,self(),"Stopping round-robin handoff."),
rpc:multicall(NodeList,application,set_env,[riak_core,disable_inbound_handoff,true]),
rpc:multicall(NodeList,application,set_env,[riak_core,disable_outbound_handoff,true]),
rpc:multicall(NodeList,riak_core_handoff_manager,set_concurrency,[0]),
unregister(?MODULE)
end.
%% build list of all possible 2-node combinations
buildpairs([ Node1, Node2 ]) ->
[{Node1, Node2}];
buildpairs([Node | Rest]) ->
[{Node,Node2} || Node2 <- Rest] ++ buildpairs(Rest).
%% start(Delay) begin round-robin concurrency, advance after Delay
%% minutes - default 1 hour
start() ->
start(60,2).
start(N) ->
start(N,2).
start(Delay,Concurrency) ->
case whereis(?MODULE) of
undefined ->
lager:log(info,self(),"Starting round-robin handoff_concurrency of ~p every ~p minutes.",[Concurrency, Delay]),
{ok,Ring} = riak_core_ring_manager:get_my_ring(),
Members = riak_core_ring:all_members(Ring),
if
length(Members) > 1 ->
register(?MODULE,
spawn(fun() -> loop(Delay * 60000,
buildpairs(Members),Members,Concurrency)
end)),
erlang:send_after(Delay,?MODULE,tick);
true -> {error, "Need at least 2 cluster members to handoff"}
end;
_ -> {error, "Already Started"}
end.
stop() ->
case whereis(?MODULE) of
undefined ->
{error, "Not running"};
Pid ->
Pid ! stop
end.
step() ->
whereis(?MODULE) ! tick.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment