Skip to content

Instantly share code, notes, and snippets.

@l04m33
Created April 27, 2012 09:41
Show Gist options
  • Save l04m33/2507853 to your computer and use it in GitHub Desktop.
Save l04m33/2507853 to your computer and use it in GitHub Desktop.
a callback server
-module(gen_callback_server).
-behaviour(gen_server).
-include("gen_callback_server.hrl").
-export([behaviour_info/1]).
-export([
start_link/4,
start_link/3,
start/4,
start/3,
do_sync/2,
do_sync/3,
do_async/2,
do_async/3,
do/4,
do/5,
reply_cb/1,
receive_cb/2,
client_reply_cb/1,
mixed_reply_cb/1
]).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-record(state,
{
options :: [atom() | tuple()],
name :: server_ref(),
cb_module :: module(),
cb_state :: any()
}).
-define(CB(State), (State#state.cb_module)).
-define(DEFAULT_RPC_TIMEOUT, 5000).
behaviour_info(callbacks) ->
[
{init, 1},
{handle_action, 2},
{handle_info, 2},
{terminate, 2},
{code_change, 3}
].
start_link(CBModule, CBArgs, Options) ->
{Opts, NewOptions} = get_cb_options(Options),
gen_server:start_link(?MODULE, [CBModule, {Opts} | CBArgs], NewOptions).
start_link(Name, CBModule, CBArgs, Options) ->
{Opts, NewOptions} = get_cb_options(Options),
gen_server:start_link(Name, ?MODULE, [CBModule, {Name, Opts} | CBArgs], NewOptions).
start(CBModule, CBArgs, Options) ->
{Opts, NewOptions} = get_cb_options(Options),
gen_server:start(?MODULE, [CBModule, {Opts} | CBArgs], NewOptions).
start(Name, CBModule, CBArgs, Options) ->
{Opts, NewOptions} = get_cb_options(Options),
gen_server:start(Name, ?MODULE, [CBModule, {Name, Opts} | CBArgs], NewOptions).
do(Target, Msg, RemoteCBFunc, LocalCBFunc) ->
do(Target, Msg, RemoteCBFunc, LocalCBFunc, ?DEFAULT_RPC_TIMEOUT).
do(Target, Msg, RemoteCBFunc, LocalCBFunc, Timeout) ->
MsgRef = make_ref(),
%% TODO: monitor the target process even though we're using gen_server:cast(...)
gen_server:cast(Target, {gen_callback_msg, self(), MsgRef, Msg, RemoteCBFunc}),
call_recv_cb(LocalCBFunc, MsgRef, Timeout).
reply_cb(CBEvent) ->
ReplyTo = CBEvent#cb_event.reply_to,
Reply = CBEvent#cb_event.cb_arg,
Replier = CBEvent#cb_event.sent_from,
MsgRef = CBEvent#cb_event.msg_ref,
ReplyTo ! {gen_callback_reply, MsgRef, Reply, Replier},
ok.
receive_cb(CBEvent, Timeout) ->
MsgRef = CBEvent#cb_event.msg_ref,
receive
{gen_callback_reply, MsgRef, Reply, _Replier} ->
Reply
after
Timeout ->
error(timeout)
end.
client_reply_cb(CBEvent) ->
Reply = CBEvent#cb_event.cb_arg,
{Senders, Protocol} = CBEvent#cb_event.context,
case Reply of
{ok, ClientRepContent} ->
PTMod = get_pt_mod(Protocol),
{ok, Packet} = PTMod:write(Protocol, ClientRepContent),
lib_send:send(Senders, Packet);
{error, quiet} ->
void;
{error, ErrCode} ->
mod_err:send_error(pid, Senders, Protocol, ErrCode)
end.
mixed_reply_cb(CBEvent) ->
Reply = CBEvent#cb_event.cb_arg,
{Senders, Protocol} = CBEvent#cb_event.context,
case Reply of
{ok, RepContent, ClientRepContent} ->
reply_cb(CBEvent#cb_event{cb_arg = RepContent}),
PTMod = get_pt_mod(Protocol),
{ok, Packet} = PTMod:write(Protocol, ClientRepContent),
lib_send:send(Senders, Packet);
{ok, RepContent} ->
reply_cb(CBEvent#cb_event{cb_arg = RepContent});
{error, quiet} ->
reply_cb(CBEvent#cb_event{cb_arg = ok}); %% TODO: 这里的 ok 改成更合适的返回值
{error, ErrCode} ->
reply_cb(CBEvent#cb_event{cb_arg = ok}),
mod_err:send_error(pid, Senders, Protocol, ErrCode)
end.
do_sync(Target, Msg) ->
do(Target, Msg, fun reply_cb/1, fun receive_cb/2).
do_sync(Target, Msg, Timeout) ->
do(Target, Msg, fun reply_cb/1, fun receive_cb/2, Timeout).
do_async(Target, Msg) ->
do(Target, Msg, none, none).
do_async(Target, Msg, RemoteCBFunc) ->
do(Target, Msg, RemoteCBFunc, none).
%% gen_server callbacks
init(Args) ->
[CBModule, NameOpts | CBArgs] = Args,
case CBModule:init(CBArgs) of
{ok, DummyState} ->
NewState =
case NameOpts of
{Name, Opts} ->
#state{
name = {Name, node()},
options = Opts,
cb_module = CBModule,
cb_state = DummyState
};
{Opts} ->
#state{
name = self(),
options = Opts,
cb_module = CBModule,
cb_state = DummyState
}
end,
{ok, NewState};
Other ->
Other
end.
handle_call(_Msg, _From, State) ->
%% Intentionally left blank....
{noreply, State}.
handle_cast({gen_callback_msg, Originator, MsgRef, CBMsg, CBFunc}, State) ->
case ?CB(State):handle_action(CBMsg, State#state.cb_state) of
{reply, CBArg, NewCBState} ->
call_rep_cb(CBFunc, Originator, CBArg, MsgRef, State#state.name),
{noreply, State#state{cb_state = NewCBState}};
{reply, CBArg, NewCBState, Timeout} ->
call_rep_cb(CBFunc, Originator, CBArg, MsgRef, State#state.name),
{noreply, State#state{cb_state = NewCBState}, Timeout};
{noreply, NewCBState} ->
{noreply, State#state{cb_state = NewCBState}};
{noreply, NewCBState, Timeout} ->
{noreply, State#state{cb_state = NewCBState}, Timeout};
{stop, Reason, CBArg, NewCBState} ->
call_rep_cb(CBFunc, Originator, CBArg, MsgRef, State#state.name),
{stop, Reason, State#state{cb_state = NewCBState}};
{stop, Reason, NewCBState} ->
{stop, Reason, State#state{cb_state = NewCBState}}
end.
handle_info(Info, State) ->
case ?CB(State):handle_info(Info, State#state.cb_state) of
{noreply, NewCBState} ->
{noreply, State#state{cb_state = NewCBState}};
{noreply, NewCBState, Timeout} ->
{noreply, State#state{cb_state = NewCBState}, Timeout};
{stop, Reason, NewCBState} ->
{stop, Reason, State#state{cb_state = NewCBState}}
end.
terminate(Reason, State) ->
?CB(State):terminate(Reason, State#state.cb_state).
code_change(OldVsn, State, Extra) ->
{ok, NewCBState} = ?CB(State):code_change(OldVsn, State#state.cb_state, Extra),
{ok, State#state{cb_state = NewCBState}}.
%% Local functions
get_cb_options(Options) ->
case lists:keytake(callback_opts, 1, Options) of
{value, {_, Opts}, NewOptList} ->
{Opts, NewOptList};
false ->
{[], Options}
end.
call_recv_cb(RecvCBFunc, MsgRef, Timeout) ->
NewCBEvent = #cb_event{msg_ref = MsgRef},
case RecvCBFunc of
{F, Context} when is_function(F, 2) ->
F(NewCBEvent#cb_event{context = Context}, Timeout);
F when is_function(F, 2) ->
F(NewCBEvent, Timeout);
none ->
ok;
_ ->
error(invalid_callback)
end.
call_rep_cb(RepCBFunc, Originator, CBArg, MsgRef, SelfName) ->
NewCBEvent =
#cb_event{
reply_to = Originator,
cb_arg = CBArg,
msg_ref = MsgRef,
sent_from = SelfName
},
case RepCBFunc of
{F, Context} when is_function(F, 1) ->
F(NewCBEvent#cb_event{context = Context});
F when is_function(F, 1) ->
F(NewCBEvent);
none ->
void;
_ ->
error(invalid_callback)
end.
get_pt_mod(Protocol) ->
PTSuffix = lists:sublist(integer_to_list(Protocol), 2),
list_to_atom("pt_" ++ PTSuffix).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment