secret
Last active

  • Download Gist
gistfile1.erl
Erlang
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
%%%
%%%
%%%
-module (module_name).
 
-behaviour(gen_server).
 
-include_lib("amqp_client/include/amqp_client.hrl").
 
-export([start_link/0]).
 
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
%% Internal state
-record(state, {
connection :: pid(), % rabbitMQ connection
connection_ref :: reference(), % connection monitor ref
channel :: pid(), % rabbitMQ channel
channel_ref :: reference(), % channel monitor ref
% ---
rabbitmq_restart_timeout = 5000 :: pos_integer(), % restart timeout
}).
 
%%==========
%% API
%%
%% @doc Starts the server
%% @end
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
init(_Args) ->
gen_server:cast(self(), connect),
{ok, #state{
rabbitmq_restart_timeout = appenv:get(?APP, rabbitmq_restart_timeout, 5000)
}}.
 
%%
%% @doc Handling all messages from RabbitMQ
%% @end
handle_info({#'basic.deliver'{delivery_tag = Tag, routing_key = _Queue}, #amqp_msg{props = #'P_basic'{reply_to = ReplyTo}, payload = Body}} = _Msg, #state{channel = Channel} = State) ->
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
try
Message = binary_to_term(Body),
?INFO("You go: ~p", [Message])
catch
_:_ ->
?ERR("Cannot parse message from shop: ~p", [Body]),
error
end,
{noreply, State};
 
handle_info({'DOWN', ConnectionRef, process, Connection, Reason}, #state{connection = Connection, connection_ref = ConnectionRef} = State) ->
?WARN("AMQP connection error: ~p", [Reason]),
restart_me(State);
 
handle_info({'DOWN', ChannelRef, process, Channel, Reason}, #state{channel = Channel, channel_ref = ChannelRef} = State) ->
?WARN("AMQP channel error: ~p", [Reason]),
restart_me(State);
 
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
 
handle_info(#'basic.cancel'{}, State) ->
{noreply, State};
 
handle_info(_Info, State) ->
?ERR("Unsupported info message: ~p", [_Info]),
{noreply, State}.
 
handle_cast(connect, State) ->
% connection parameters
AMQP_Param = #amqp_params_network{
host = appenv:get(?APP, rabbitmq_host, "localhost"),
username = appenv:get(?APP, rabbitmq_username, <<"max">>),
password = appenv:get(?APP, rabbitmq_password, <<"password">>),
port = appenv:get(?APP, rabbitmq_port, 5672),
virtual_host = appenv:get(?APP, rabbitmq_vhost, <<"vhost">>),
heartbeat = appenv:get(?APP, rabbitmq_heartbeat, 5)
},
% connection...
case amqp_connection:start(AMQP_Param) of
{ok, Connection} ->
% start connection monitor
ConnectionRef = erlang:monitor(process, Connection),
case amqp_connection:open_channel(Connection) of
{ok, Channel} ->
ChannelRef = erlang:monitor(process, Channel),
%
% Here you have to subscribe to queues you want to listen
%
%
{noreply, State#state{
connection = Connection,
connection_ref = ConnectionRef,
channel = Channel,
channel_ref = ChannelRef
}};
_Reason2 ->
% channel error
?WARN("AMQP channel error: ~p", [_Reason2]),
restart_me(State)
end;
_Reason1 ->
% connection error
?WARN("AMQP connection error: ~p", [_Reason1]),
restart_me(State)
end;
 
handle_cast(_Msg, State) ->
?ERR("Unsupported cast message: ~p", [_Msg]),
{noreply, State}.
 
handle_call(_Request, _From, State) ->
?ERR("Unsupported call message: ~p, from: ~p", [_Request, _From]),
{reply, ok, State}.
 
terminate(_Reason, #state{connection = Connection, channel = Channel} = _State) ->
if
is_pid(Channel) -> amqp_channel:close(Channel);
true -> pass
end,
if
is_pid(Connection) -> amqp_connection:close(Connection);
true -> pass
end,
ok.
 
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
 
restart_me(#state{rabbitmq_restart_timeout = Wait} = State) ->
?WARN("AMQP is down, will try to reconnect in ~p seconds.", [(Wait/1000)]),
timer:sleep(Wait), % Sleep for Wait seconds
{stop, error, State}.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.