Skip to content

anonymous /gist:6cc6099af6c2255282e8 secret
Created

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
%%%
%%%
%%%
-module (module_name).
-behaviour(gen_server).
-include_lib("amqp_client/include/amqp_client.hrl").
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%% Internal state
-record(state, {
connection :: pid(), % rabbitMQ connection
connection_ref :: reference(), % connection monitor ref
channel :: pid(), % rabbitMQ channel
channel_ref :: reference(), % channel monitor ref
% ---
rabbitmq_restart_timeout = 5000 :: pos_integer(), % restart timeout
}).
%%==========
%% API
%%
%% @doc Starts the server
%% @end
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init(_Args) ->
gen_server:cast(self(), connect),
{ok, #state{
rabbitmq_restart_timeout = appenv:get(?APP, rabbitmq_restart_timeout, 5000)
}}.
%%
%% @doc Handling all messages from RabbitMQ
%% @end
handle_info({#'basic.deliver'{delivery_tag = Tag, routing_key = _Queue}, #amqp_msg{props = #'P_basic'{reply_to = ReplyTo}, payload = Body}} = _Msg, #state{channel = Channel} = State) ->
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
try
Message = binary_to_term(Body),
?INFO("You go: ~p", [Message])
catch
_:_ ->
?ERR("Cannot parse message from shop: ~p", [Body]),
error
end,
{noreply, State};
handle_info({'DOWN', ConnectionRef, process, Connection, Reason}, #state{connection = Connection, connection_ref = ConnectionRef} = State) ->
?WARN("AMQP connection error: ~p", [Reason]),
restart_me(State);
handle_info({'DOWN', ChannelRef, process, Channel, Reason}, #state{channel = Channel, channel_ref = ChannelRef} = State) ->
?WARN("AMQP channel error: ~p", [Reason]),
restart_me(State);
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
handle_info(#'basic.cancel'{}, State) ->
{noreply, State};
handle_info(_Info, State) ->
?ERR("Unsupported info message: ~p", [_Info]),
{noreply, State}.
handle_cast(connect, State) ->
% connection parameters
AMQP_Param = #amqp_params_network{
host = appenv:get(?APP, rabbitmq_host, "localhost"),
username = appenv:get(?APP, rabbitmq_username, <<"max">>),
password = appenv:get(?APP, rabbitmq_password, <<"password">>),
port = appenv:get(?APP, rabbitmq_port, 5672),
virtual_host = appenv:get(?APP, rabbitmq_vhost, <<"vhost">>),
heartbeat = appenv:get(?APP, rabbitmq_heartbeat, 5)
},
% connection...
case amqp_connection:start(AMQP_Param) of
{ok, Connection} ->
% start connection monitor
ConnectionRef = erlang:monitor(process, Connection),
case amqp_connection:open_channel(Connection) of
{ok, Channel} ->
ChannelRef = erlang:monitor(process, Channel),
%
% Here you have to subscribe to queues you want to listen
%
%
{noreply, State#state{
connection = Connection,
connection_ref = ConnectionRef,
channel = Channel,
channel_ref = ChannelRef
}};
_Reason2 ->
% channel error
?WARN("AMQP channel error: ~p", [_Reason2]),
restart_me(State)
end;
_Reason1 ->
% connection error
?WARN("AMQP connection error: ~p", [_Reason1]),
restart_me(State)
end;
handle_cast(_Msg, State) ->
?ERR("Unsupported cast message: ~p", [_Msg]),
{noreply, State}.
handle_call(_Request, _From, State) ->
?ERR("Unsupported call message: ~p, from: ~p", [_Request, _From]),
{reply, ok, State}.
terminate(_Reason, #state{connection = Connection, channel = Channel} = _State) ->
if
is_pid(Channel) -> amqp_channel:close(Channel);
true -> pass
end,
if
is_pid(Connection) -> amqp_connection:close(Connection);
true -> pass
end,
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
restart_me(#state{rabbitmq_restart_timeout = Wait} = State) ->
?WARN("AMQP is down, will try to reconnect in ~p seconds.", [(Wait/1000)]),
timer:sleep(Wait), % Sleep for Wait seconds
{stop, error, State}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.