Skip to content

Instantly share code, notes, and snippets.

@boorad
Created August 3, 2009 22:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boorad/160871 to your computer and use it in GitHub Desktop.
Save boorad/160871 to your computer and use it in GitHub Desktop.
%%% -*- erlang-indent-level:2 -*-
%%%-------------------------------------------------------------------
%%% File: membership2.erl
%%% @author Cliff Moon <cliff@powerset.com> []
%%% @copyright 2009 Cliff Moon
%%% @doc
%%%
%%% @end
%%%
%%% @since 2009-05-04 by Cliff Moon
%%%-------------------------------------------------------------------
-module(membership2).
-author('cliff@powerset.com').
-author('brad@cloudant.com').
-behaviour(gen_server).
%% API
-export([start_link/2, start_link/3, register/2, servers_for_key/1, stop/1,
partitions/0, servers/0, servers/1, servers_to_list/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% includes
-include("../include/config.hrl").
-include("../include/common.hrl").
-include_lib("eunit/include/eunit.hrl").
%% init,join,gossip,register,down
-define(GOSSIP_INFO, []).
%%====================================================================
%% API
%%====================================================================
%% @doc Starts the server
%% @end
%%--------------------------------------------------------------------
start_link(Node, Nodes) ->
start_link(Node, Nodes, []).
start_link(Node, Nodes, Args) ->
gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []).
register(Partition, Pid) ->
gen_server:cast(membership, {register, Partition, Pid}).
stop(Server) ->
gen_server:cast(Server, stop).
partitions() ->
gen_server:call(membership, partitions).
servers() ->
servers(all).
servers(Type) ->
gen_server:call(membership, {servers, Type}).
servers_for_key(Key) ->
gen_server:call(membership, {servers_for_key, Key}).
%% this gets everything we know of, not just locals
servers_to_list(Servers) ->
L = ets:foldl(fun
({Partition, Pid}, List) ->
[{Partition, Pid}|List];
({_Ref, _Partition, _Pid}, List) ->
List
end, [], Servers),
lists:sort(L).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @doc Initiates the server
%% @end
%%--------------------------------------------------------------------
init([Node, Nodes, Args]) ->
process_flag(trap_exit,true),
Config = configuration:get_config(),
#membership{nodes=PersistentNodes,
partitions=PersistentParts,
version=PersistentVersion} = load(Node),
Servers = ets:new(member_servers, [public, bag]),
Hints = proplists:get_value(hints, Args, []),
%% Hints = showroom_brain:hints(PartialNodes) TODO
{WorldNodes, PMap, Version} =
case PersistentParts of
undefined ->
% didn't find persistent state on disk
PartialNodes = lists:usort(Nodes ++ PersistentNodes),
Partners = replication:partners(Node, Nodes, Config),
PartnersPlus = partners_plus(Node, Partners, Nodes),
{NewVersion, RemoteNodes} = join_to(Node, PartnersPlus, Hints),
NewWorldNodes = lists:usort(PartialNodes ++ RemoteNodes),
{NewWorldNodes,
get_init_pmap(Node, Partners, NewWorldNodes, Config),
vector_clock:increment(node(), NewVersion)};
_ ->
% found persistent state on disk
case Hints of
[] -> ok;
_ -> ?infoFmt("~nPersistent State was loaded from disk. Hints ~p "
"ignored.", [Hints])
end,
{PersistentNodes, PersistentParts, PersistentVersion}
end,
State = #membership{
node=Node,
nodes=WorldNodes,
partitions=PMap,
version=Version,
servers=Servers},
save(State),
?infoMsg("Loading storage servers.~n"),
storage_manager:load(Nodes, Servers, State#membership.partitions,
int_partitions_for_node(Node, State, all),
[{type,init},{dst,Node}]
),
{ok, State}.
%%--------------------------------------------------------------------
%% @spec
%% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @doc Handling call messages
%% @end
%%--------------------------------------------------------------------
%% join
handle_call({join, JoiningNode, Hints}, _From,
State = #membership{version=Version, node=Node, nodes=Nodes,
servers=Servers, partitions=Partitions}) ->
WorldNodes = lists:usort(Nodes ++ [JoiningNode]),
PMap = case partitions:join(JoiningNode, Partitions, Hints) of
{ok, Table} -> Table;
{error, Error, _Table} -> throw({join_error, Error})
end,
NewVersion = vector_clock:increment(node(), Version),
NewState = State#membership{nodes=WorldNodes, partitions=PMap,
version=NewVersion},
save(NewState),
%% clean up this node's storage servers, after JoiningNode just joined.
storage_manager:load(Nodes, Servers, PMap,
int_partitions_for_node(Node, NewState, all),
[{type,join},{dst,Node}]
),
fire_gossip(Node, WorldNodes, gossip_join, {NewVersion, PMap}),
{reply, {NewVersion, WorldNodes}, NewState};
%% servers_for_key
handle_call({servers_for_key, Key}, _From,
State = #membership{servers=Servers}) ->
Config = configuration:get_config(),
Hash = lib_misc:hash(Key),
Partition = list_to_atom(partitions:hash_to_hex(Hash, Config#config.q)),
ServerList = servers_to_list(Servers),
Pids = proplists:get_all_values(Partition, ServerList),
{reply, Pids, State};
%% state
handle_call(state, _From, State) ->
{reply, State, State};
%% partitions
handle_call(partitions, _From, State = #membership{partitions=Parts}) ->
{reply, {ok, Parts}, State};
%% servers
handle_call({servers, list}, _From, State = #membership{servers=Servers}) ->
{reply, {ok, servers_to_list(Servers)}, State};
handle_call({servers, ets}, _From, State = #membership{servers=Servers}) ->
{reply, {ok, ets:tab2list(Servers)}, State};
handle_call({servers, _}, _From, State = #membership{servers=Servers}) ->
{reply, {ok, [{list, servers_to_list(Servers)},
{ets, ets:tab2list(Servers)}]}, State};
handle_call(_Request, _From, State) ->
{reply, ignored, State}.
%%--------------------------------------------------------------------
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Handling cast messages
%% @end
%%--------------------------------------------------------------------
%% register
handle_cast({register, Partition, Pid},
State = #membership{node=Me, nodes=Nodes, servers=Servers,
version=Version}) ->
%% ?debugFmt("~nRegistering ~p~n", [{Partition, Pid}]),
Ref = erlang:monitor(process, Pid),
ets:insert(Servers, {Partition, Pid}),
ets:insert(Servers, {Ref, Partition, Pid}),
NewVersion = vector_clock:increment(node(), Version),
NewState = State#membership{node=Me, version=NewVersion},
ServerList = servers_to_list(Servers),
fire_gossip(Me, Nodes, gossip_register, {NewVersion, ServerList,
Ref, Partition, Pid}),
{noreply, NewState};
%% gossip_join
handle_cast({gossip_join, {RemoteVersion, RemotePMap}, RemoteNode},
State = #membership{version=LocalVersion, node=LocalNode,
partitions=LocalPMap, nodes=Nodes}) ->
{MergeType, MergedVersion, MergedPMap} =
merge_pmaps(RemoteVersion, RemotePMap, RemoteNode,
LocalVersion, LocalPMap, LocalNode),
NewState =
case MergeType of
equal -> State;
merged ->
fire_gossip(LocalNode, Nodes, gossip_join, {MergedVersion, MergedPMap}),
State#membership{version=MergedVersion, partitions=MergedPMap}
end,
{noreply, NewState};
%% gossip_register and gossip_down TODO: guards for Type?
handle_cast({Type, {RemoteVersion, RemoteServerList, Ref, Part, Pid},
RemoteNode},
State = #membership{version=LocalVersion, node=LocalNode,
servers=LocalServers, nodes=Nodes}) ->
Op = case Type of
gossip_register -> insert;
gossip_down -> delete_object
end,
{MergeType, MergedVersion, MergedServerList} =
merge_servers(RemoteVersion, RemoteServerList, RemoteNode,
Op, Ref, Part, Pid,
LocalVersion, LocalServers, LocalNode),
case MergeType of
equal -> {noreply, State};
merged ->
?debugFmt("~nMergedServerList: ~p~n", [MergedServerList]),
server_list_into_table(MergedServerList, LocalServers),
fire_gossip(LocalNode, Nodes, Type,
{MergedVersion, MergedServerList, Ref, Part, Pid}),
NewState = State#membership{version=MergedVersion},
storage_manager:load(Nodes,
LocalServers,
NewState#membership.partitions,
int_partitions_for_node(LocalNode, NewState, all),
[{type,Type},{dst,LocalNode}]
),
{noreply, NewState}
end;
%% stop
handle_cast(stop, State) ->
{stop, normal, State}.
%%--------------------------------------------------------------------
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Handling all non call/cast messages
%% @end
%%--------------------------------------------------------------------
handle_info({'DOWN', Ref, _, Pid, _},
State = #membership{node=Me, nodes=Nodes, servers=Servers,
version=Version}) ->
erlang:demonitor(Ref),
[{Ref, Partition, Pid}] = ets:lookup(Servers, Ref),
ets:delete_object(Servers, {Ref, Partition, Pid}),
ets:delete_object(Servers, {Partition, Pid}),
?infoFmt("Storage Server Pid is down ~p~n", [{Ref, Partition, Pid}]),
NewVersion = vector_clock:increment(node(), Version),
NewState = State#membership{node=Me, version=NewVersion},
ServerList = servers_to_list(Servers),
fire_gossip(Me, Nodes, gossip_down, {NewVersion, ServerList,
Ref, Partition, Pid}),
{noreply, NewState};
handle_info(Info, State) ->
?debugFmt("~nInfo: ~p~n", [Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @spec terminate(Reason, State) -> void()
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @doc Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
%% return State from membership file
load(Node) ->
Config = configuration:get_config(),
case file:consult(filename:join([Config#config.directory,
lists:concat([node:name(Node), ".state"])])) of
{error, Reason} ->
?infoFmt("Could not load state: ~p~n", [Reason]),
#membership{nodes=[]};
{ok, [Terms]} ->
Terms
end.
%% save the State to a file
save(State) ->
Config = configuration:get_config(),
Filename = filename:join([Config#config.directory,
lists:concat([node:name(State#membership.node), ".state"])]),
{ok, File} = file:open(Filename, [binary, write]),
io:format(File, "~w.~n", [State]),
file:close(File).
%% joining is bi-directional, as opposed to gossip which is unidirectional
%% we want to collect the list of known nodes to compute the partition map
%% which isn't necessarily the same as the list of running nodes
join_to(Node, Partners, Hints) ->
join_to(Node, Partners,
{vector_clock:create(node()), []}, Hints).
join_to(_, [], {Version, World}, _Hints) ->
{Version, World};
join_to(Node, [Remote|Partners], {Version, World}, Hints) ->
case call_join(Remote, Node, Hints) of
{'EXIT', _} ->
join_to(Node, Partners, {Version, World}, Hints);
{RemoteVersion, NewNodes} ->
join_to(Node, Partners, {
vector_clock:merge(Version, RemoteVersion),
lists:usort(World ++ NewNodes)});
Val ->
?debugFmt("join_to unexpected value: ~p~n", [Val])
end.
call_join(Remote, Node, Hints) ->
catch gen_server:call({membership, node:name(Remote)},
{join, Node, Hints}).
merge_pmaps(RemoteVersion, RemotePMap, _RemoteNode,
LocalVersion, LocalPMap, _LocalNode) ->
case vector_clock:compare(RemoteVersion, LocalVersion) of
equal ->
{equal, LocalVersion, LocalPMap};
less ->
{equal, LocalVersion, LocalPMap};
greater ->
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion),
{merged, MergedVersion, RemotePMap};
concurrent ->
%% TODO: use clocks/nodes to determine proper map to return
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion),
{merged, MergedVersion, RemotePMap}
end.
merge_servers(RemoteVersion, RemoteServerList, RemoteNode,
Op, Ref, Part, Pid,
LocalVersion, LocalServers, LocalNode) ->
LocalServerList = servers_to_list(LocalServers),
%% ?debugFmt("~n"
%% "RemoteVersion : ~p~n"
%% "RemoteNode : ~p~n"
%% "RemoteServerList: ~p~n"
%% "LocalVersion : ~p~n"
%% "LocalNode : ~p~n"
%% "LocalServerList : ~p~n"
%% , [RemoteVersion, RemoteNode, RemoteServerList,
%% LocalVersion, LocalNode, LocalServerList]),
case vector_clock:compare(RemoteVersion, LocalVersion) of
equal ->
?debugMsg("remote clock EQUAL"),
{equal, LocalVersion, LocalServerList};
less ->
?debugMsg("remote clock LESS"),
{equal, LocalVersion, LocalServerList};
greater ->
?debugMsg("remote clock GREATER"),
%% TODO: separate fun, that this, concurrent, and register cast use
ets:Op(LocalServers, {Part, Pid}),
ets:Op(LocalServers, {Ref, Part, Pid}),
MergedServerList = merge_server_lists(RemoteServerList,
servers_to_list(LocalServers)),
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion),
{merged, MergedVersion, MergedServerList};
concurrent ->
?debugMsg("remote clock CONCURRENT"),
%% TODO: use clocks/nodes to determine proper map to return
%% TODO: separate fun, that this, greater, and register cast use
ets:Op(LocalServers, {Part, Pid}),
ets:Op(LocalServers, {Ref, Part, Pid}),
MergedServerList = merge_server_lists(RemoteServerList,
servers_to_list(LocalServers)),
MergedVersion = vector_clock:merge(RemoteVersion, LocalVersion),
{merged, MergedVersion, MergedServerList}
end.
merge_server_lists(Remote, Local) ->
%% assuming they're sorted by servers_to_list/1
lists:usort(lists:merge(Local, Remote)).
fire_gossip(Me, WorldNodes, Type, Gossip) ->
Partners = replication:partners(Me, WorldNodes),
GossipPartners = partners_plus(Me, Partners, WorldNodes),
lists:foreach(fun(TargetNode) ->
gen_server:cast({membership, TargetNode}, {Type,Gossip,Me})
end,
GossipPartners).
%% return a list of live/up Partners, and if all Partners are down,
%% walk the ring to get one other remote node and return it.
partners_plus(Node, Partners, WorldNodes) ->
PartnersDown = lists:subtract(Partners, nodes()),
PartnersUp = lists:subtract(Partners, PartnersDown),
case PartnersUp of
[] ->
TargetNodes = replication:target_list(Node, WorldNodes),
NonPartners = lists:subtract(TargetNodes,
lists:flatten([Node, Partners])),
walk_ring(NonPartners);
_ ->
%% at least one partner is up, so gossip w/ them
PartnersUp
end.
walk_ring([]) ->
%% TODO: should we be more forceful here and throw? not for now
?infoFmt("~p:walk_ring/1 - could not find node for gossip~n", [?MODULE]),
[];
walk_ring([Node|Rest]) ->
case lists:member(Node, nodes()) of
true -> [Node];
_ -> walk_ring(Rest)
end.
server_list_into_table(ServerList, Servers) ->
lists:foreach(fun({Partition, Pid}) ->
ets:insert(Servers, {Partition, Pid})
end, ServerList).
int_partitions_for_node(Node, State, master) ->
Partitions = State#membership.partitions,
{Matching,_} = lists:partition(fun({N,_}) -> N == Node end, Partitions),
lists:map(fun({_,P}) -> P end, Matching);
int_partitions_for_node(Node, State, all) ->
Config = configuration:get_config(),
Nodes = State#membership.nodes,
Partners = replication:partners(Node, Nodes, Config),
lists:foldl(
fun(E, Acc) ->
lists:merge(Acc, int_partitions_for_node(E, State, master))
end, [], lists:flatten([Node, Partners])).
%% @doc get the partition table/map from some remote nodes, or
%% create a new one
%% @end
%% TODO: do vector clocks need to be here, to ensure we get most
%% up-to-date table?
get_init_pmap(Node, Partners, WorldNodes, Config) ->
case length(Partners) of
0 ->
partitions:create_partitions(Config#config.q, Node, WorldNodes);
_ ->
case get_table_from_remotes(Partners) of
{ok, Table} -> Table;
_Error ->
%% ok, we got some error, so we're gonna provide a fresh
%% parts table. Hopefully gossip fixes this
partitions:create_partitions(Config#config.q, Node, WorldNodes)
end
end.
get_table_from_remotes([]) ->
no_table_from_remotes;
%% throw({partition_table_error, "could not get partition table from remotes"});
get_table_from_remotes([Remote|Rest]) ->
try
case gen_server:call({membership, Remote}, partitions) of
{ok, Table} ->
{ok, Table};
_Other ->
get_table_from_remotes(Rest)
end
catch _:_ ->
get_table_from_remotes(Rest)
end.
%%
%% internal tests
%%
int_parts_for_node_self_test() ->
%% TODO: effigy mock configuration N=3
Node = a,
Partitions = [{a,1},{a,2},{a,3}],
State = #membership{node=Node, nodes=[a], partitions=Partitions},
?assertEqual([1,2,3], int_partitions_for_node(Node, State, all)),
ok.
int_parts_for_node_partners_test() ->
%% TODO: effigy mock configuration N=2
Node = a,
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6}],
State = #membership{node=Node, nodes=[a,b], partitions=Partitions},
?assertEqual([1,2,3,4], int_partitions_for_node(Node, State, all)),
ok.
int_parts_for_node_bigring_test() ->
%% TODO: effigy mock configuration N=3
Node = a,
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6},{d,7},{d,8}],
State = #membership{node=Node, nodes=[a,b,c,d], partitions=Partitions},
?assertEqual([1,2,3,4,5,6], int_partitions_for_node(Node, State, all)),
ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment