Skip to content

Instantly share code, notes, and snippets.

@boorad
Created July 21, 2009 03:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save boorad/151090 to your computer and use it in GitHub Desktop.
Save boorad/151090 to your computer and use it in GitHub Desktop.
%%% -*- erlang-indent-level:2 -*-
%%%-------------------------------------------------------------------
%%% File: membership2.erl
%%% @author Cliff Moon <cliff@powerset.com> []
%%% @copyright 2009 Cliff Moon
%%% @doc
%%%
%%% @end
%%%
%%% @since 2009-05-04 by Cliff Moon
%%%-------------------------------------------------------------------
-module(membership2).
-author('cliff@powerset.com').
-author('brad@cloudant.com').
-behaviour(gen_server).
%% API
-export([start_link/2, start_link/3, register/2, servers_for_key/1, stop/1,
listen/1, partitions/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% testing exports
-export([servers_to_list/1]).
%% includes
-include("../include/config.hrl").
-include("../include/common.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(GOSSIP_INFO, false).
%%====================================================================
%% API
%%====================================================================
%% @doc Starts the server
%% @end
%%--------------------------------------------------------------------
start_link(Node, Nodes) ->
start_link(Node, Nodes, []).
start_link(Node, Nodes, Args) ->
gen_server:start_link({local, membership}, ?MODULE, [Node, Nodes, Args], []).
register(Partition, Pid) ->
gen_server:cast(membership, {register, Partition, Pid}).
servers_for_key(Key) ->
gen_server:call(membership, {servers_for_key, Key}).
stop(Server) ->
gen_server:cast(Server, stop).
listen(Pid) ->
gen_server:call(membership, {listen, Pid}).
partitions() ->
gen_server:call(membership, partitions).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @doc Initiates the server
%% @end
%%--------------------------------------------------------------------
init([Node, Nodes, Args]) ->
process_flag(trap_exit,true),
Config = configuration:get_config(),
PersistentNodes = load(Node),
PartialNodes = lists:usort(Nodes ++ PersistentNodes),
Partners = replication:partners(Node, Nodes, Config),
Servers = ets:new(member_servers, [public, bag]),
Hints = proplists:get_value(hints, Args, []),
%% Hints = showroom_brain:hints(PartialNodes) TODO
{Version, RemoteNodes} = join_to(Node, Servers, Partners, Hints),
WorldNodes = lists:usort(PartialNodes ++ RemoteNodes),
PMap = get_init_pmap(Node, Partners, WorldNodes, Config),
State = #membership{
node=Node,
nodes=WorldNodes,
partitions=PMap,
version=vector_clock:increment(pid_to_list(self()), Version),
servers=Servers},
save(State),
?infoMsg("Loading storage servers.~n"),
storage_manager:load(Nodes, State#membership.partitions,
int_partitions_for_node(Node, State, all)),
{ok, State}.
%%--------------------------------------------------------------------
%% @spec
%% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% @doc Handling call messages
%% @end
%%--------------------------------------------------------------------
%% join
handle_call({join, JoiningNode, Hints}, _From,
State = #membership{version=Version, node=Node, nodes=Nodes,
servers=Servers, partitions=Partitions}) ->
Config = configuration:get_config(),
WorldNodes = lists:usort(Nodes ++ [JoiningNode]),
PMap = case partitions:join(JoiningNode, Partitions, Hints) of
{ok, Table} -> Table;
{error, Error, _Table} -> throw({join_error, Error})
end,
ServerList = servers_to_list(Servers),
NewVersion = vector_clock:increment(pid_to_list(self()), Version),
NewState = State#membership{nodes=WorldNodes, partitions=PMap,
version=NewVersion},
%% clean up this node's storage servers, after JoiningNode just joined.
storage_manager:load(Nodes, PMap,
int_partitions_for_node(Node, NewState, all)),
Info = case ?GOSSIP_INFO of
false -> [];
_ ->
[{type, join}
, {src, Node}
, {joining_node, JoiningNode}
, {old_pmap, Partitions}
, {new_pmap, PMap}
, {old_version, Version}
, {new_version, NewVersion}
, {servers, Servers}
]
end,
fire_gossip(Node, NewState, Config, Info),
{reply, {NewVersion, WorldNodes, ServerList}, NewState};
%% listen
handle_call({listen, Pid}, _From,
State = #membership{listeners=Listeners, partitions=PMap}) ->
{reply, PMap, State#membership{listeners=lists:usort([Pid|Listeners])}};
%% servers_for_key
handle_call({servers_for_key, Key}, _From,
State = #membership{servers=Servers}) ->
Config = configuration:get_config(),
Hash = lib_misc:hash(Key),
Partition = partitions:hash_to_partition(Hash, Config#config.q),
?debugFmt("getting for partition ~p", [Partition]),
{_, Pids} = lists:unzip(ets:lookup(Servers, Partition)),
{reply, Pids, State};
%% state
handle_call(state, _From, State) ->
{reply, State, State};
%% partitions
handle_call(partitions, _From, State = #membership{partitions=Parts}) ->
{reply, {ok, Parts}, State}.
%%--------------------------------------------------------------------
%% @spec handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Handling cast messages
%% @end
%%--------------------------------------------------------------------
%% gossip
handle_cast({gossip, Version, Nodes, ServerList, PMap, Info},
State = #membership{node=Me}) ->
case ?GOSSIP_INFO of
false -> ok;
_ ->
?debugFmt("~nreceive gossip...~n"
"Self : ~p~n"
"Info : ~p~n"
,[self(), Info])
end,
{MergeType, Merged} = merge_state(Version, Nodes, ServerList, PMap, State),
case MergeType of
equal ->
%% ?debugMsg("Gossip 'equal'"),
{noreply, Merged};
merged ->
storage_manager:load(Nodes, Merged#membership.partitions,
int_partitions_for_node(Me, Merged, all)),
NewInfo = case ?GOSSIP_INFO of
false -> [];
_ ->
[{type, gossip}
, {src, Me}
, {old_pmap, PMap}
, {new_pmap, Merged#membership.partitions}
, {old_version, Version}
, {new_version, Merged#membership.version}
, {servers, ServerList}
]
end,
fire_gossip(Me, Merged, configuration:get_config(), NewInfo),
publish_map_to_listeners(Merged),
{noreply, Merged}
end;
%% register
handle_cast({register, Partition, Pid},
State = #membership{node=Me, servers=Servers, version=Version}) ->
?debugFmt("~nRegistering ~p~n", [{Partition, Pid}]),
Ref = erlang:monitor(process, Pid),
ets:insert(Servers, {Partition, Pid}),
ets:insert(Servers, {Ref, Partition, Pid}),
NewState = State#membership{node=Me,
version=vector_clock:increment(pid_to_list(self()), Version)},
Info = [ {type, register}
, {src, Me}
, {servers, servers_to_list(Servers)}
],
fire_gossip(Me, NewState, configuration:get_config(), Info),
{noreply, NewState};
%% stop
handle_cast(stop, State) ->
{stop, normal, State}.
%%--------------------------------------------------------------------
%% @spec handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% @doc Handling all non call/cast messages
%% @end
%%--------------------------------------------------------------------
handle_info({'DOWN', Ref, _, Pid, _},
State = #membership{node=Me, servers=Servers,
version=Version}) ->
erlang:demonitor(Ref),
[{Ref, Partition, Pid}] = ets:lookup(Servers, Ref),
ets:delete_object(Servers, {Ref, Partition, Pid}),
ets:delete_object(Servers, {Partition, Pid}),
?debugFmt("Pid is down ~p", [{Ref, Partition, Pid}]),
?debugFmt("~p", [ets:lookup(Servers, Partition)]),
NewState = State#membership{node=Me,
version=vector_clock:increment(pid_to_list(self()), Version)},
Info = [ {type, 'DOWN'}
, {src, Me}
, {servers, servers_to_list(Servers)}
],
fire_gossip(Me, NewState, configuration:get_config(), Info),
{noreply, State};
handle_info(Info, State) ->
?debugFmt("~nInfo: ~p~n", [Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @spec terminate(Reason, State) -> void()
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @doc Convert process state when code is changed
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
%% return list of known nodes from membership file
load(Node) ->
Config = configuration:get_config(),
case file:consult(filename:join([Config#config.directory,
lists:concat([node:name(Node), ".world"])])) of
{error, Reason} ->
?infoFmt("Could not load state: ~p~n", [Reason]),
[];
{ok, [Terms]} ->
Terms
end.
%% save the list of known nodes to a file
save(State) ->
Config = configuration:get_config(),
Filename = filename:join([Config#config.directory,
lists:concat([node:name(State#membership.node), ".world"])]),
{ok, File} = file:open(Filename, [binary, write]),
io:format(File, "~w.~n", [State#membership.nodes]),
file:close(File).
%% joining is bi-directional, as opposed to gossip which is unidirectional
%% we want to collect the list of known nodes to compute the partition map
%% which isn't necessarily the same as the list of running nodes
join_to(Node, Servers, Partners, Hints) ->
%% ?debugFmt("~njoin_to~n"
%% "Node : ~p~n"
%% "Servers : ~p~n"
%% "Partners : ~p~n"
%% , [Node, Servers, Partners]),
join_to(Node, Servers, Partners,
{vector_clock:create(pid_to_list(self())), []}, Hints).
join_to(_, _, [], {Version, World}, _Hints) ->
{Version, World};
join_to(Node, Servers, [Remote|Partners], {Version, World}, Hints) ->
case call_join(Remote, Node, Hints) of
{'EXIT', _} ->
join_to(Node, Servers, Partners, {Version, World}, Hints);
{RemoteVersion, NewNodes, ServerList} ->
server_list_into_table(ServerList, Servers),
join_to(Node, Servers, Partners, {
vector_clock:merge(Version, RemoteVersion),
lists:usort(World ++ NewNodes)});
Val ->
?debugFmt("join_to unexpected value: ~p~n", [Val])
end.
call_join(Remote, Node, Hints) ->
catch gen_server:call({membership, node:name(Remote)},
{join, Node, Hints}).
merge_state(RemoteVersion, RemoteNodes, RemoteServerList, RemotePMap,
State = #membership{nodes=Nodes, version=LocalVersion, servers=Servers,
partitions=LocalPMap}) ->
case Compare = vector_clock:compare(RemoteVersion, LocalVersion) of
equal -> {equal, State};
_ ->
MergedNodes = lists:usort(RemoteNodes ++ Nodes),
server_list_into_table(RemoteServerList, Servers),
MergedClock = vector_clock:merge(RemoteVersion, LocalVersion),
MergedPMap = case Compare of
less -> LocalPMap;
_ -> RemotePMap
end,
{merged, State#membership{nodes=MergedNodes,version=MergedClock,
partitions=MergedPMap}}
end.
fire_gossip(Me, State = #membership{nodes = Nodes}, Config, Info) ->
Partners = replication:partners(Me, Nodes, Config),
lists:foreach(fun(Node) -> gossip_with(Me, Node, State, Info) end, Partners).
gossip_with(_Me, OtherNode,
#membership{version=Version, nodes=Nodes, servers=Servers,
partitions=PMap}, Info) ->
ServerPacket = servers_to_list(Servers),
cast_gossip(OtherNode, Version, Nodes, ServerPacket, PMap, Info).
cast_gossip(OtherNode, Version, Nodes, ServerPacket, PMap, Info) ->
case ?GOSSIP_INFO of
false -> ok;
_ ->
?debugFmt("~nfire_gossip:~n"
"Self : ~p~n"
"Info : ~p~n"
"Target: ~p~n"
, [self(), Info, OtherNode])
end,
gen_server:cast({membership, OtherNode}, {gossip, Version, Nodes,
ServerPacket, PMap, Info}).
%% this gets everything we know of, not just locals
servers_to_list(Servers) ->
L = ets:foldl(fun
({Partition, Pid}, List) ->
[{Partition, Pid}|List];
({_Ref, _Partition, _Pid}, List) ->
List
end, [], Servers),
lists:keysort(1, L).
server_list_into_table(ServerList, Servers) ->
lists:foreach(fun({Partition, Pid}) ->
ets:insert(Servers, {Partition, Pid})
end, ServerList).
publish_map_to_listeners(#membership{partitions=PMap, listeners=Listeners,
node=Node}) ->
%% ?debugFmt("~nPublish New PMap~n"
%% "Listeners : ~p~n"
%% , [Listeners]),
lists:foreach(fun(Pid) ->
gen_server:cast(Pid, {remap, Node, PMap})
end, Listeners).
int_partitions_for_node(Node, State, master) ->
Partitions = State#membership.partitions,
{Matching,_} = lists:partition(fun({N,_}) -> N == Node end, Partitions),
lists:map(fun({_,P}) -> P end, Matching);
int_partitions_for_node(Node, State, all) ->
Config = configuration:get_config(),
Nodes = State#membership.nodes,
Partners = replication:partners(Node, Nodes, Config),
%% ?debugFmt("~n"
%% "Nodes : ~p~n"
%% "Partners: ~p~n"
%% , [Nodes, Partners]),
lists:foldl(
fun(E, Acc) ->
lists:merge(Acc, int_partitions_for_node(E, State, master))
end, [], lists:flatten([Node, Partners])).
%% @doc get the partition table/map from some remote nodes, or
%% create a new one
%% @end
%% TODO: do vector clocks need to be here, to ensure we get most
%% up-to-date table?
get_init_pmap(Node, RemoteNodes, WorldNodes, Config) ->
case length(RemoteNodes) of
0 ->
partitions:create_partitions(Config#config.q, Node, WorldNodes);
_ ->
case get_table_from_remotes(RemoteNodes) of
{ok, Table} -> Table;
_Error ->
%% ok, we got some error, so we're gonna provide a fresh
%% parts table. Hopefully gossip fixes this
partitions:create_partitions(Config#config.q, Node, WorldNodes)
end
end.
get_table_from_remotes([]) ->
no_table_from_remotes;
%% throw({partition_table_error, "could not get partition table from remotes"});
get_table_from_remotes([Remote|Rest]) ->
try
case gen_server:call({membership, Remote}, partitions) of
{ok, Table} ->
{ok, Table};
_Other ->
get_table_from_remotes(Rest)
end
catch _:_ ->
get_table_from_remotes(Rest)
end.
%%
%% internal tests
%%
int_parts_for_node_self_test() ->
%% TODO: effigy mock configuration N=3
Node = a,
Partitions = [{a,1},{a,2},{a,3}],
State = #membership{node=Node, nodes=[a], partitions=Partitions},
?assertEqual([1,2,3], int_partitions_for_node(Node, State, all)),
ok.
int_parts_for_node_partners_test() ->
%% TODO: effigy mock configuration N=2
Node = a,
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6}],
State = #membership{node=Node, nodes=[a,b], partitions=Partitions},
?assertEqual([1,2,3,4], int_partitions_for_node(Node, State, all)),
ok.
int_parts_for_node_bigring_test() ->
%% TODO: effigy mock configuration N=3
Node = a,
Partitions = [{a,1},{a,2},{b,3},{b,4},{c,5},{c,6},{d,7},{d,8}],
State = #membership{node=Node, nodes=[a,b,c,d], partitions=Partitions},
?assertEqual([1,2,3,4,5,6], int_partitions_for_node(Node, State, all)),
ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment