Skip to content

Instantly share code, notes, and snippets.

@tonysun83
Created October 2, 2014 05:24
Show Gist options
  • Save tonysun83/e2a1e3f0cd0764b62bcf to your computer and use it in GitHub Desktop.
Save tonysun83/e2a1e3f0cd0764b62bcf to your computer and use it in GitHub Desktop.
diff --git a/src/rexi.erl b/src/rexi.erl
index 6d0712c..c853ce9 100644
--- a/src/rexi.erl
+++ b/src/rexi.erl
@@ -11,7 +11,7 @@
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-
+
-module(rexi).
-export([start/0, stop/0, restart/0]).
-export([cast/2, cast/3, cast/4, kill/2]).
@@ -21,24 +21,24 @@
-export([stream_start/1, stream_cancel/1]).
-export([stream/1, stream/2, stream/3, stream_ack/1, stream_ack/2]).
-export([stream2/1, stream2/2, stream2/3, stream_last/1, stream_last/2]).
-
+
-include("rexi.hrl").
-
+
start() ->
application:start(rexi).
-
+
stop() ->
application:stop(rexi).
-
+
restart() ->
stop(), start().
-
-
+
+
%% @equiv cast(Node, self(), MFA)
-spec cast(node(), {atom(), atom(), list()}) -> reference().
cast(Node, MFA) ->
cast(Node, self(), MFA).
-
+
%% @doc Executes apply(M, F, A) on Node.
%% You might want to use this instead of rpc:cast/4 for two reasons. First,
%% the Caller pid and the returned reference are inserted into the remote
@@ -52,7 +52,7 @@ cast(Node, Caller, MFA) ->
Msg = cast_msg({doit, {Caller, Ref}, get(nonce), MFA}),
rexi_utils:send(rexi_utils:server_pid(Node), Msg),
Ref.
-
+
%% @doc Executes apply(M, F, A) on Node.
%% This version accepts a sync option which uses the erlang:send/2 call
%% directly in process instead of deferring to a spawned process if
@@ -69,19 +69,19 @@ cast(Node, Caller, MFA, Options) ->
false ->
cast(Node, Caller, MFA)
end.
-
+
%% @doc Sends an async kill signal to the remote process associated with Ref.
%% No rexi_EXIT message will be sent.
-spec kill(node(), reference()) -> ok.
kill(Node, Ref) ->
rexi_utils:send(rexi_utils:server_pid(Node), cast_msg({kill, Ref})),
ok.
-
+
%% @equiv async_server_call(Server, self(), Request)
-spec async_server_call(pid() | {atom(),node()}, any()) -> reference().
async_server_call(Server, Request) ->
async_server_call(Server, self(), Request).
-
+
%% @doc Sends a properly formatted gen_server:call Request to the Server and
%% returns the reference which the Server will include in its reply. The
%% function acts more like cast() than call() in that the server process
@@ -92,17 +92,17 @@ async_server_call(Server, Caller, Request) ->
Ref = make_ref(),
rexi_utils:send(Server, {'$gen_call', {Caller,Ref}, Request}),
Ref.
-
+
%% @doc convenience function to reply to the original rexi Caller.
-spec reply(any()) -> any().
reply(Reply) ->
{Caller, Ref} = get(rexi_from),
erlang:send(Caller, {Ref,Reply}).
-
+
%% @equiv sync_reply(Reply, 300000)
sync_reply(Reply) ->
sync_reply(Reply, 300000).
-
+
%% @doc convenience function to reply to caller and wait for response. Message
%% is of the form {OriginalRef, {self(),reference()}, Reply}, which enables the
%% original caller to respond back.
@@ -116,11 +116,11 @@ sync_reply(Reply, Timeout) ->
after Timeout ->
timeout
end.
-
+
%% @equiv stream_init(300000)
stream_init() ->
stream_init(300000).
-
+
%% @doc Initialize an RPC stream that involves sending multiple
%% messages back to the coordinator.
%%
@@ -130,20 +130,25 @@ stream_init() ->
%% `erlang:exit/1`.
-spec stream_init(pos_integer()) -> ok.
stream_init(Timeout) ->
+ twig:log(notice,"STREAM_INIT CALLED, SELF PID: ~p", self()),
case sync_reply(rexi_STREAM_INIT, Timeout) of
rexi_STREAM_START ->
+ twig:log(notice,"STREAM_INIT sync_reply ok, SELF PID: ~p", self()),
ok;
rexi_STREAM_CANCEL ->
+ twig:log(notice, "CANCEL DURING STREAM_INT CALL"),
exit(normal);
timeout ->
+ twig:log(notice, "TIME OUT DURING STREAM_INT CALL"),
couch_stats:increment_counter(
[rexi, streams, timeout, init_stream]
),
exit(normal);
Else ->
+ twig:log(notice, "ELSE DURING STREAM_INT CALL"),
exit({invalid_stream_message, Else})
end.
-
+
%% @doc Start a worker stream
%%
%% If a coordinator wants to continue using a streaming worker it
@@ -152,8 +157,9 @@ stream_init(Timeout) ->
%% the worker in the rexi_STREAM_INIT message.
-spec stream_start({pid(), any()}) -> ok.
stream_start({Pid, _Tag}=From) when is_pid(Pid) ->
+ twig:log(notice,"STREAM_START CALLED, SELF PID: ~p, FROM: ~p", [self(),From]),
gen_server:reply(From, rexi_STREAM_START).
-
+
%% @doc Cancel a worker stream
%%
%% If a coordinator decideds that a worker is not going to be part
@@ -162,16 +168,17 @@ stream_start({Pid, _Tag}=From) when is_pid(Pid) ->
%% rexi_STREAM_INIT message.
-spec stream_cancel({pid(), any()}) -> ok.
stream_cancel({Pid, _Tag}=From) when is_pid(Pid) ->
+ twig:log(notice,"STREAM_CANCEL CALLED, SELF PID: ~p, FROM: ~p", [self(),From]),
gen_server:reply(From, rexi_STREAM_CANCEL).
-
+
%% @equiv stream(Msg, 100, 300000)
stream(Msg) ->
stream(Msg, 10, 300000).
-
+
%% @equiv stream(Msg, Limit, 300000)
stream(Msg, Limit) ->
stream(Msg, Limit, 300000).
-
+
%% @doc convenience function to stream messages to caller while blocking when
%% a specific number of messages are outstanding. Message is of the form
%% {OriginalRef, self(), Reply}, which enables the original caller to ack.
@@ -187,15 +194,15 @@ stream(Msg, Limit, Timeout) ->
couch_stats:increment_counter([rexi, streams, timeout, stream]),
exit(normal)
end.
-
+
%% @equiv stream2(Msg, 10, 300000)
stream2(Msg) ->
stream2(Msg, 10, 300000).
-
+
%% @equiv stream2(Msg, Limit, 300000)
stream2(Msg, Limit) ->
stream2(Msg, Limit, 300000).
-
+
%% @doc Stream a message back to the coordinator. It limits the
%% number of unacked messsages to Limit and throws a timeout error
%% if it doesn't receive an ack in Timeout milliseconds. This
@@ -211,33 +218,34 @@ stream2(Msg, Limit, Timeout) ->
erlang:send(Caller, {Ref, self(), Msg}),
ok
catch throw:timeout ->
+ twig:log(notice, "Timed Out in Stream2"),
couch_stats:increment_counter([rexi, streams, timeout, stream]),
exit(normal)
end.
-
+
%% @equiv stream_last(Msg, 300000)
stream_last(Msg) ->
stream_last(Msg, 300000).
-
+
%% @doc Send the last message in a stream. This difference between
%% this and stream is that it uses rexi:reply/1 which doesn't include
%% the worker pid and doesn't wait for a response from the controller.
stream_last(Msg, Timeout) ->
maybe_init_stream(Timeout),
rexi:reply(Msg).
-
+
%% @equiv stream_ack(Client, 1)
stream_ack(Client) ->
erlang:send(Client, {rexi_ack, 1}).
-
+
%% @doc Ack streamed messages
stream_ack(Client, N) ->
erlang:send(Client, {rexi_ack, N}).
-
+
%% internal functions %%
-
+
cast_msg(Msg) -> {'$gen_cast', Msg}.
-
+
maybe_init_stream(Timeout) ->
case get(rexi_STREAM_INITED) of
true ->
@@ -245,13 +253,15 @@ maybe_init_stream(Timeout) ->
_ ->
init_stream(Timeout)
end.
-
+
init_stream(Timeout) ->
case sync_reply(rexi_STREAM_INIT, Timeout) of
rexi_STREAM_START ->
+ twig:log(notice,"INIT_STREAM syn_reply returns rexi_STREAM_START: ~p", [self()]),
put(rexi_STREAM_INITED, true),
ok;
rexi_STREAM_CANCEL ->
+ twig:log(notice,"INIT_STREAM syn_reply returns rexi_STREAM_CANCEL, exit: ~p", [self()]),
exit(normal);
timeout ->
couch_stats:increment_counter(
@@ -261,7 +271,7 @@ init_stream(Timeout) ->
Else ->
exit({invalid_stream_message, Else})
end.
-
+
maybe_wait(Limit, Timeout) ->
case get(rexi_unacked) of
undefined ->
@@ -271,7 +281,7 @@ maybe_wait(Limit, Timeout) ->
Count ->
drain_acks(Count)
end.
-
+
wait_for_ack(Count, Timeout) ->
receive
{rexi_ack, N} -> drain_acks(Count-N)
@@ -279,7 +289,7 @@ wait_for_ack(Count, Timeout) ->
couch_stats:increment_counter([rexi, streams, timeout, wait_for_ack]),
throw(timeout)
end.
-
+
drain_acks(Count) when Count < 0 ->
erlang:error(mismatched_rexi_ack);
drain_acks(Count) ->
@@ -287,4 +297,4 @@ drain_acks(Count) ->
{rexi_ack, N} -> drain_acks(Count-N)
after 0 ->
{ok, Count}
- end.
+ end.
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment