Skip to content

Instantly share code, notes, and snippets.

@seanlynch
Created July 8, 2010 17:52
Show Gist options
  • Save seanlynch/468357 to your computer and use it in GitHub Desktop.
Save seanlynch/468357 to your computer and use it in GitHub Desktop.
%% @author Northscale <info@northscale.com>
%% @copyright 2009 NorthScale, Inc.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(ns_pubsub).
-behaviour(gen_event).
-record(state, {func, func_state}).
-record(subscribe_all_state, {noderefs, name, func, func_state}).
-export([code_change/3, init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2]).
-export([subscribe/2, subscribe/3,
subscribe_all/2, subscribe_all/3,
unsubscribe/2, unsubscribe_all/1]).
%%
%% API
%%
subscribe(Name, Pid) when is_pid(Pid) ->
subscribe(Name, msg_fun(Pid), ignored).
subscribe(Name, Fun, State) ->
Ref = make_ref(),
ok = gen_event:add_sup_handler(Name, {?MODULE, Ref},
#state{func=Fun, func_state=State}),
Ref.
%% Subscribe to the same event on all nodes
subscribe_all(Name, Pid) when is_pid(Pid) ->
subscribe_all(Name, msg_fun(Pid), ignored).
subscribe_all(Name, Fun, State) ->
Ref = make_ref(),
ok = gen_event:add_sup_handler(ns_node_disco_events, {?MODULE, Ref},
#subscribe_all_state{noderefs=[], name=Name,
func=Fun,
func_state=State}),
ok = gen_event:call(ns_node_disco_events, {?MODULE, Ref},
{init_nodes, ns_node_disco:nodes_actual_proper()}),
Ref.
unsubscribe(Name, Ref) ->
gen_event:delete_handler(Name, {?MODULE, Ref}, unsubscribed).
%% We don't need the name for this because it's in the state of the
%% handler installed on ns_node_disco_events
unsubscribe_all(Ref) ->
gen_event:delete_handler(ns_node_disco_events, {?MODULE, Ref},
unsubscribed).
%%
%% gen_event callbacks
%%
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
init(State) ->
{ok, State}.
%% Only set initial nodes if we haven't yet received an event
handle_call({init_nodes, Nodes}, State = #subscribe_all_state{noderefs=[]}) ->
NodeRefs = [{Node, subscribe_node(Node, State)} || Node <- Nodes],
{ok, ok, State#subscribe_all_state{noderefs=NodeRefs}};
handle_call({init_nodes, _}, State = #subscribe_all_state{noderefs=[_|_]}) ->
{ok, ok, State}.
handle_event(Event, State = #state{func=Fun, func_state=FS}) ->
NewState = Fun(Event, FS),
{ok, State#state{func_state=NewState}};
handle_event({ns_node_disco_events, _OldNodes, Nodes},
State = #subscribe_all_state{noderefs=NodeRefs}) ->
NewNodeRefs = [{N, subscribe_node(N, State)} ||
N <- Nodes, not lists:keymember(N, 1, NodeRefs)],
KeepNodeRefs = [NR || NR = {N, _} <- NodeRefs, not lists:member(N, Nodes)],
{ok, State#subscribe_all_state{noderefs=NewNodeRefs ++ KeepNodeRefs}}.
handle_info(_Msg, State) ->
{ok, State}.
terminate(Reason, #subscribe_all_state{noderefs=NodeRefs, name=Name}) ->
error_logger:info_msg("~p removing subscribe_all handler ~p: ~p~n",
[?MODULE, Name, Reason]),
lists:foreach(fun ({Node, Ref}) ->
unsubscribe({Name, Node}, Ref)
end, NodeRefs);
terminate(Reason, State) ->
error_logger:info_msg("~p unsubscribed ~p: ~p~n", [?MODULE, State, Reason]).
%%
%% Internal functions
%%
%% Function sending a message to a pid
msg_fun(Pid) ->
fun (Event, State) ->
Pid ! Event,
State
end.
subscribe_node(Node, #subscribe_all_state{name=Name, func=Fun,
func_state=FState}) ->
subscribe({Name, Node}, Fun, FState).
%% @author Northscale <info@northscale.com>
%% @copyright 2009 NorthScale, Inc.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% Centralized time service
-module(ns_tick).
-behaviour(gen_server).
-define(EVENT_MANAGER, ns_tick_event).
-define(INTERVAL, 1000).
-define(SERVER, ?MODULE).
-export([start_link/0]).
-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, init/1,
terminate/2]).
-export([tick/1]). % used internally via rpc
-record(state, {time}).
%%
%% API
%%
start_link() ->
misc:start_singleton_gen_server(?MODULE, [], []).
%%
%% gen_server callbacks
%%
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
init([]) ->
timer:send_interval(?INTERVAL, tick),
{ok, #state{}}.
handle_call(unhandled, unhandled, unhandled) ->
unhandled.
handle_cast(unhandled, unhandled) ->
unhandled.
%% Called once per second on the node where the gen_server runs
handle_info(tick, State) ->
Now = erlang:now(),
rpc:eval_everywere(?MODULE, tick, [Now]),
{noreply, State#state{time=Now}}.
terminate(_Reason, _State) ->
ok.
%%
%% Internal functions
%%
%% Called on all nodes via RPC to send an event to the local event manager
tick(Now) ->
gen_event:notify(?EVENT_MANAGER, {tick, Now}).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment