Skip to content

Instantly share code, notes, and snippets.

@blinkov
Created October 28, 2012 15:07
Show Gist options
  • Save blinkov/3968847 to your computer and use it in GitHub Desktop.
Save blinkov/3968847 to your computer and use it in GitHub Desktop.
RabbitMQ Erlang Example
-module(event_server).
-behaviour(gen_server).
-author("Ivan Blinkov <ivan@blinkov.ru>").
%% --------------------------------------------------------------------
%%
%% Basically it just implements a publish/subscribe pattern for SockJS
%% using a single fanout RabbitMQ exchange as a message bus.
%%
%% Eventually got it all replaced with https://github.com/blinkov/sockjs-pubsub
%%
%% P.S.: By the way I had to cut off some project-specific stuff from
%% this file, sorry if it broke something... Was intended just as an example.
%%
%% --------------------------------------------------------------------
%% --------------------------------------------------------------------
%% External exports
-export([start_link/1, publish/1, register/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-include("amqp_client.hrl").
-record(state, {channel, connection, queue, consumer_tag, socket_connections}).
-define(EVENTS_EXCHANGE, <<"events">>).
%% ====================================================================
%% External functions
%% ====================================================================
start_link(Args) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []).
publish(Message) ->
gen_server:call(?MODULE, {publish, Message}).
register(SocketConnection) ->
gen_server:call(?MODULE, {register, SocketConnection}).
%% ====================================================================
%% Server functions
%% ====================================================================
%% --------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% --------------------------------------------------------------------
init(_Args) ->
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
{ok, Channel} = amqp_connection:open_channel(Connection),
#'exchange.declare_ok'{} = amqp_channel:call(Channel,
#'exchange.declare'{exchange = ?EVENTS_EXCHANGE, type = <<"fanout">>}),
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel,
#'queue.declare'{}),
#'queue.bind_ok'{} = amqp_channel:call(Channel,
#'queue.bind'{queue = Queue, exchange = ?EVENTS_EXCHANGE}),
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Queue}),
{ok, #state{queue = Queue, channel = Channel,
connection = Connection, consumer_tag = Tag,
socket_connections = []}}.
%% --------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%% --------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%% --------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%% --------------------------------------------------------------------
handle_cast({publish, Message},State) ->
amqp_channel:cast(State#state.channel,
#'basic.publish'{exchange = ?EVENTS_EXCHANGE},
#amqp_msg{props = #'P_basic'{}, payload =
iolist_to_binary(mochijson2_fork:encode(Message))}),
{noreply, State};
handle_cast({register, SocketConnection},State) ->
SocketConnections = [SocketConnection | State#state.socket_connections],
{noreply,State#state{socket_connections=SocketConnections}};
handle_cast(_Msg, State) ->
{noreply, State}.
%% --------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%% --------------------------------------------------------------------
handle_info({#'basic.deliver'{delivery_tag = Tag}, Message}, State) ->
AliveSocketConnections = lists:foldl(fun(Conn, Acc) ->
{sockjs_session,{Pid, _}} = Conn,
case is_process_alive(Pid) of
true ->
sockjs:send(Message#amqp_msg.payload, Conn),
[Conn | Acc];
false ->
Acc
end
end, [], State#state.socket_connections),
amqp_channel:cast(State#state.channel, #'basic.ack'{delivery_tag = Tag}),
{noreply, State#state{socket_connections = AliveSocketConnections}};
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
handle_info(shutdown, State) ->
{stop, normal, State};
handle_info(#'basic.cancel_ok'{}, State) ->
{stop, normal, State};
handle_info(_Info, State) ->
{noreply, State}.
%% --------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%% --------------------------------------------------------------------
%% Closes the channel this gen_server instance stops
terminate(_Reason, #state{channel = Channel, consumer_tag = Tag,
connection = Connection}) ->
amqp_channel:call(Channel,#'basic.cancel'{consumer_tag = Tag}),
amqp_connection:close(Connection),
ok.
%% --------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%% --------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% --------------------------------------------------------------------
%%% Internal functions
%% --------------------------------------------------------------------
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment