Skip to content

Instantly share code, notes, and snippets.

@majek
Created February 20, 2012 13:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save majek/1869301 to your computer and use it in GitHub Desktop.
Save majek/1869301 to your computer and use it in GitHub Desktop.
diff -r 3be0091c6401 src/rabbit_stomp_client_sup.erl
--- a/src/rabbit_stomp_client_sup.erl Wed Feb 08 15:53:59 2012 +0000
+++ b/src/rabbit_stomp_client_sup.erl Wed Feb 22 11:52:02 2012 +0000
@@ -18,7 +18,7 @@
-behaviour(supervisor2).
-define(MAX_WAIT, 16#ffffffff).
--export([start_link/1, start_processor/3, init/1]).
+-export([start_link/1, start_processor/5, init/1]).
start_link(Configuration) ->
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
@@ -31,13 +31,15 @@
[rabbit_stomp_reader]}),
{ok, SupPid, ReaderPid}.
-start_processor(SupPid, Configuration, Sock) ->
+start_processor(SupPid, SendFrame, AdapterInfo, StartHeartbeatFun,
+ Configuration) ->
supervisor2:start_child(SupPid,
{rabbit_stomp_processor,
{rabbit_stomp_processor, start_link,
- [Sock,
- rabbit_heartbeat:start_heartbeat_fun(SupPid),
- Configuration]},
+ [SendFrame,
+ AdapterInfo,
+ StartHeartbeatFun,
+ Configuration]},
intrinsic, ?MAX_WAIT, worker,
[rabbit_stomp_processor]}).
diff -r 3be0091c6401 src/rabbit_stomp_processor.erl
--- a/src/rabbit_stomp_processor.erl Wed Feb 08 15:53:59 2012 +0000
+++ b/src/rabbit_stomp_processor.erl Wed Feb 22 11:52:02 2012 +0000
@@ -17,7 +17,7 @@
-module(rabbit_stomp_processor).
-behaviour(gen_server2).
--export([start_link/3, process_frame/2, flush_and_die/1]).
+-export([start_link/4, process_frame/2, flush_and_die/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2]).
@@ -27,10 +27,10 @@
-include("rabbit_stomp_prefixes.hrl").
-include("rabbit_stomp_headers.hrl").
--record(state, {socket, session_id, channel,
- connection, subscriptions, version,
- start_heartbeat_fun, pending_receipts,
- config, dest_queues, reply_queues, frame_transformer}).
+-record(state, {session_id, channel, connection, subscriptions,
+ version, start_heartbeat_fun, pending_receipts,
+ config, dest_queues, reply_queues, frame_transformer,
+ adapter_info, send_fun}).
-record(subscription, {dest_hdr, channel, multi_ack, description}).
@@ -40,9 +40,10 @@
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
-start_link(Sock, StartHeartbeatFun, Configuration) ->
- gen_server2:start_link(?MODULE, [Sock, StartHeartbeatFun, Configuration],
- []).
+start_link(SendFun, AdapterInfo, StartHeartbeatFun, Configuration) ->
+ gen_server2:start_link(?MODULE, [SendFun, AdapterInfo, StartHeartbeatFun,
+ Configuration],
+ []).
process_frame(Pid, Frame = #stomp_frame{command = "SEND"}) ->
credit_flow:send(Pid),
@@ -57,11 +58,11 @@
%% Basic gen_server2 callbacks
%%----------------------------------------------------------------------------
-init([Sock, StartHeartbeatFun, Configuration]) ->
+init([SendFun, AdapterInfo, StartHeartbeatFun, Configuration]) ->
process_flag(trap_exit, true),
+
{ok,
#state {
- socket = Sock,
session_id = none,
channel = none,
connection = none,
@@ -72,7 +73,9 @@
config = Configuration,
dest_queues = sets:new(),
reply_queues = dict:new(),
- frame_transformer = undefined},
+ frame_transformer = undefined,
+ adapter_info = AdapterInfo,
+ send_fun = SendFun},
hibernate,
{backoff, 1000, 1000, 10000}
}.
@@ -181,10 +184,10 @@
Frame,
State = #state{
channel = none,
- socket = Sock,
config = #stomp_configuration{
default_login = DefaultLogin,
- default_passcode = DefaultPasscode}}) ->
+ default_passcode = DefaultPasscode},
+ adapter_info = AdapterInfo}) ->
process_request(
fun(StateN) ->
case negotiate_version(Frame) of
@@ -193,6 +196,7 @@
Frame1 = FT(Frame),
{ok, DefaultVHost} =
application:get_env(rabbit, default_vhost),
+ {ProtoName, _} = AdapterInfo#adapter_info.protocol,
Res = do_login(
rabbit_stomp_frame:header(Frame1,
?HEADER_LOGIN,
@@ -207,7 +211,8 @@
rabbit_stomp_frame:header(Frame1,
?HEADER_HEART_BEAT,
"0,0"),
- adapter_info(Sock, Version),
+ AdapterInfo#adapter_info{
+ protocol = {ProtoName, Version}},
Version,
StateN#state{frame_transformer = FT}),
case {Res, Implicit} of
@@ -466,52 +471,6 @@
error("Bad CONNECT", "Authentication failure\n", State)
end.
-adapter_info(Sock, Version) ->
- {Addr, Port} = case rabbit_net:sockname(Sock) of
- {ok, Res} -> Res;
- _ -> {unknown, unknown}
- end,
- {PeerAddr, PeerPort} = case rabbit_net:peername(Sock) of
- {ok, Res2} -> Res2;
- _ -> {unknown, unknown}
- end,
- #adapter_info{protocol = {'STOMP', Version},
- address = Addr,
- port = Port,
- peer_address = PeerAddr,
- peer_port = PeerPort,
- additional_info = maybe_ssl_info(Sock)}.
-
-maybe_ssl_info(Sock) ->
- case rabbit_net:is_ssl(Sock) of
- true -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock);
- false -> [{ssl, false}]
- end.
-
-ssl_info(Sock) ->
- {Protocol, KeyExchange, Cipher, Hash} =
- case rabbit_net:ssl_info(Sock) of
- {ok, {P, {K, C, H}}} -> {P, K, C, H};
- {ok, {P, {K, C, H, _}}} -> {P, K, C, H};
- _ -> {unknown, unknown, unknown, unknown}
- end,
- [{ssl_protocol, Protocol},
- {ssl_key_exchange, KeyExchange},
- {ssl_cipher, Cipher},
- {ssl_hash, Hash}].
-
-ssl_cert_info(Sock) ->
- case rabbit_net:peercert(Sock) of
- {ok, Cert} ->
- [{peer_cert_issuer, list_to_binary(
- rabbit_ssl:peer_cert_issuer(Cert))},
- {peer_cert_subject, list_to_binary(
- rabbit_ssl:peer_cert_subject(Cert))},
- {peer_cert_validity, list_to_binary(
- rabbit_ssl:peer_cert_validity(Cert))}];
- _ ->
- []
- end.
do_subscribe(Destination, DestHdr, Frame,
State = #state{subscriptions = Subs,
@@ -860,18 +819,19 @@
%%--------------------------------------------------------------------
ensure_heartbeats(Heartbeats,
- State = #state{socket = Sock, start_heartbeat_fun = SHF}) ->
+ State = #state{start_heartbeat_fun = SHF,
+ send_fun = RawSendFun}) ->
[CX, CY] = [list_to_integer(X) ||
X <- re:split(Heartbeats, ",", [{return, list}])],
- SendFun = fun() -> catch rabbit_net:send(Sock, <<$\n>>) end,
+ SendFun = fun() -> RawSendFun(sync, <<$\n>>) end,
Pid = self(),
ReceiveFun = fun() -> gen_server2:cast(Pid, client_timeout) end,
{SendTimeout, ReceiveTimeout} =
{millis_to_seconds(CY), millis_to_seconds(CX)},
- SHF(Sock, SendTimeout, SendFun, ReceiveTimeout, ReceiveFun),
+ SHF(SendTimeout, SendFun, ReceiveTimeout, ReceiveFun),
{{SendTimeout * 1000 , ReceiveTimeout * 1000}, State}.
@@ -992,11 +952,8 @@
body_iolist = BodyFragments},
State).
-send_frame(Frame, State = #state{socket = Sock}) ->
- %% We ignore certain errors here, as we will be receiving an
- %% asynchronous notification of the same (or a related) fault
- %% shortly anyway. See bug 21365.
- catch rabbit_net:port_command(Sock, rabbit_stomp_frame:serialize(Frame)),
+send_frame(Frame, State = #state{send_fun = SendFun}) ->
+ SendFun(async, rabbit_stomp_frame:serialize(Frame)),
State.
send_error(Message, Detail, State) ->
diff -r 3be0091c6401 src/rabbit_stomp_processor_sock.erl
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/rabbit_stomp_processor_sock.erl Wed Feb 22 11:52:02 2012 +0000
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_stomp_processor_sock).
+-export([start_processor/3]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+%%----------------------------------------------------------------------------
+
+start_processor(SupPid, Configuration, Sock) ->
+ SendFun = fun (sync, IoData) ->
+ %% no messages emitted
+ rabbit_net:send(Sock, IoData);
+ (async, IoData) ->
+ %% {inet_reply, _, _} will appear soon
+ %% We ignore certain errors here, as we will be
+ %% receiving an asynchronous notification of the
+ %% same (or a related) fault shortly anyway. See
+ %% bug 21365.
+ catch rabbit_net:port_command(Sock, IoData)
+ end,
+
+ StartHeartbeatFun =
+ fun (SendTimeout, SendFin, ReceiveTimeout, ReceiveFun) ->
+ SHF = rabbit_heartbeat:start_heartbeat_fun(SupPid),
+ SHF(Sock, SendTimeout, SendFin, ReceiveTimeout, ReceiveFun)
+ end,
+
+ {ok, ProcessorPid} = rabbit_stomp_client_sup:start_processor(
+ SupPid, SendFun, adapter_info(Sock),
+ StartHeartbeatFun, Configuration),
+ {ok, ProcessorPid}.
+
+
+adapter_info(Sock) ->
+ {Addr, Port} = case rabbit_net:sockname(Sock) of
+ {ok, Res} -> Res;
+ _ -> {unknown, unknown}
+ end,
+ {PeerAddr, PeerPort} = case rabbit_net:peername(Sock) of
+ {ok, Res2} -> Res2;
+ _ -> {unknown, unknown}
+ end,
+ #adapter_info{protocol = {'STOMP', 0},
+ address = Addr,
+ port = Port,
+ peer_address = PeerAddr,
+ peer_port = PeerPort,
+ additional_info = maybe_ssl_info(Sock)}.
+
+maybe_ssl_info(Sock) ->
+ case rabbit_net:is_ssl(Sock) of
+ true -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock);
+ false -> [{ssl, false}]
+ end.
+
+ssl_info(Sock) ->
+ {Protocol, KeyExchange, Cipher, Hash} =
+ case rabbit_net:ssl_info(Sock) of
+ {ok, {P, {K, C, H}}} -> {P, K, C, H};
+ {ok, {P, {K, C, H, _}}} -> {P, K, C, H};
+ _ -> {unknown, unknown, unknown, unknown}
+ end,
+ [{ssl_protocol, Protocol},
+ {ssl_key_exchange, KeyExchange},
+ {ssl_cipher, Cipher},
+ {ssl_hash, Hash}].
+
+ssl_cert_info(Sock) ->
+ case rabbit_net:peercert(Sock) of
+ {ok, Cert} ->
+ [{peer_cert_issuer, list_to_binary(
+ rabbit_ssl:peer_cert_issuer(Cert))},
+ {peer_cert_subject, list_to_binary(
+ rabbit_ssl:peer_cert_subject(Cert))},
+ {peer_cert_validity, list_to_binary(
+ rabbit_ssl:peer_cert_validity(Cert))}];
+ _ ->
+ []
+ end.
diff -r 3be0091c6401 src/rabbit_stomp_reader.erl
--- a/src/rabbit_stomp_reader.erl Wed Feb 08 15:53:59 2012 +0000
+++ b/src/rabbit_stomp_reader.erl Wed Feb 22 11:52:02 2012 +0000
@@ -34,7 +34,7 @@
receive
{go, Sock0, SockTransform} ->
{ok, Sock} = SockTransform(Sock0),
- {ok, ProcessorPid} = rabbit_stomp_client_sup:start_processor(
+ {ok, ProcessorPid} = rabbit_stomp_processor_sock:start_processor(
SupPid, Configuration, Sock),
{ok, ConnStr} = rabbit_net:connection_string(Sock, inbound),
log(info, "accepting STOMP connection ~p (~s)~n",
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment