Created
October 28, 2012 15:07
-
-
Save blinkov/3968847 to your computer and use it in GitHub Desktop.
RabbitMQ Erlang Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(event_server). | |
-behaviour(gen_server). | |
-author("Ivan Blinkov <ivan@blinkov.ru>"). | |
%% -------------------------------------------------------------------- | |
%% | |
%% Basically it just implements a publish/subscribe pattern for SockJS | |
%% using a single fanout RabbitMQ exchange as a message bus. | |
%% | |
%% Eventually got it all replaced with https://github.com/blinkov/sockjs-pubsub | |
%% | |
%% P.S.: By the way I had to cut off some project-specific stuff from | |
%% this file, sorry if it broke something... Was intended just as an example. | |
%% | |
%% -------------------------------------------------------------------- | |
%% -------------------------------------------------------------------- | |
%% External exports | |
-export([start_link/1, publish/1, register/2]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). | |
-include("amqp_client.hrl"). | |
-record(state, {channel, connection, queue, consumer_tag, socket_connections}). | |
-define(EVENTS_EXCHANGE, <<"events">>). | |
%% ==================================================================== | |
%% External functions | |
%% ==================================================================== | |
start_link(Args) -> | |
gen_server:start_link({local, ?MODULE}, ?MODULE, Args, []). | |
publish(Message) -> | |
gen_server:call(?MODULE, {publish, Message}). | |
register(SocketConnection) -> | |
gen_server:call(?MODULE, {register, SocketConnection}). | |
%% ==================================================================== | |
%% Server functions | |
%% ==================================================================== | |
%% -------------------------------------------------------------------- | |
%% Function: init/1 | |
%% Description: Initiates the server | |
%% Returns: {ok, State} | | |
%% {ok, State, Timeout} | | |
%% ignore | | |
%% {stop, Reason} | |
%% -------------------------------------------------------------------- | |
init(_Args) -> | |
{ok, Connection} = amqp_connection:start(#amqp_params_network{}), | |
{ok, Channel} = amqp_connection:open_channel(Connection), | |
#'exchange.declare_ok'{} = amqp_channel:call(Channel, | |
#'exchange.declare'{exchange = ?EVENTS_EXCHANGE, type = <<"fanout">>}), | |
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, | |
#'queue.declare'{}), | |
#'queue.bind_ok'{} = amqp_channel:call(Channel, | |
#'queue.bind'{queue = Queue, exchange = ?EVENTS_EXCHANGE}), | |
#'basic.consume_ok'{consumer_tag = Tag} = | |
amqp_channel:call(Channel, #'basic.consume'{queue = Queue}), | |
{ok, #state{queue = Queue, channel = Channel, | |
connection = Connection, consumer_tag = Tag, | |
socket_connections = []}}. | |
%% -------------------------------------------------------------------- | |
%% Function: handle_call/3 | |
%% Description: Handling call messages | |
%% Returns: {reply, Reply, State} | | |
%% {reply, Reply, State, Timeout} | | |
%% {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, Reply, State} | (terminate/2 is called) | |
%% {stop, Reason, State} (terminate/2 is called) | |
%% -------------------------------------------------------------------- | |
handle_call(_Request, _From, State) -> | |
Reply = ok, | |
{reply, Reply, State}. | |
%% -------------------------------------------------------------------- | |
%% Function: handle_cast/2 | |
%% Description: Handling cast messages | |
%% Returns: {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} (terminate/2 is called) | |
%% -------------------------------------------------------------------- | |
handle_cast({publish, Message},State) -> | |
amqp_channel:cast(State#state.channel, | |
#'basic.publish'{exchange = ?EVENTS_EXCHANGE}, | |
#amqp_msg{props = #'P_basic'{}, payload = | |
iolist_to_binary(mochijson2_fork:encode(Message))}), | |
{noreply, State}; | |
handle_cast({register, SocketConnection},State) -> | |
SocketConnections = [SocketConnection | State#state.socket_connections], | |
{noreply,State#state{socket_connections=SocketConnections}}; | |
handle_cast(_Msg, State) -> | |
{noreply, State}. | |
%% -------------------------------------------------------------------- | |
%% Function: handle_info/2 | |
%% Description: Handling all non call/cast messages | |
%% Returns: {noreply, State} | | |
%% {noreply, State, Timeout} | | |
%% {stop, Reason, State} (terminate/2 is called) | |
%% -------------------------------------------------------------------- | |
handle_info({#'basic.deliver'{delivery_tag = Tag}, Message}, State) -> | |
AliveSocketConnections = lists:foldl(fun(Conn, Acc) -> | |
{sockjs_session,{Pid, _}} = Conn, | |
case is_process_alive(Pid) of | |
true -> | |
sockjs:send(Message#amqp_msg.payload, Conn), | |
[Conn | Acc]; | |
false -> | |
Acc | |
end | |
end, [], State#state.socket_connections), | |
amqp_channel:cast(State#state.channel, #'basic.ack'{delivery_tag = Tag}), | |
{noreply, State#state{socket_connections = AliveSocketConnections}}; | |
handle_info(#'basic.consume_ok'{}, State) -> | |
{noreply, State}; | |
handle_info(shutdown, State) -> | |
{stop, normal, State}; | |
handle_info(#'basic.cancel_ok'{}, State) -> | |
{stop, normal, State}; | |
handle_info(_Info, State) -> | |
{noreply, State}. | |
%% -------------------------------------------------------------------- | |
%% Function: terminate/2 | |
%% Description: Shutdown the server | |
%% Returns: any (ignored by gen_server) | |
%% -------------------------------------------------------------------- | |
%% Closes the channel this gen_server instance stops | |
terminate(_Reason, #state{channel = Channel, consumer_tag = Tag, | |
connection = Connection}) -> | |
amqp_channel:call(Channel,#'basic.cancel'{consumer_tag = Tag}), | |
amqp_connection:close(Connection), | |
ok. | |
%% -------------------------------------------------------------------- | |
%% Func: code_change/3 | |
%% Purpose: Convert process state when code is changed | |
%% Returns: {ok, NewState} | |
%% -------------------------------------------------------------------- | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
%% -------------------------------------------------------------------- | |
%%% Internal functions | |
%% -------------------------------------------------------------------- |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment