anonymous / gist:6cc6099af6c2255282e8 secret
Created

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist
View gist:6cc6099af6c2255282e8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
%%%
%%%
%%%
-module (module_name).
 
-behaviour(gen_server).
 
-include_lib("amqp_client/include/amqp_client.hrl").
 
-export([start_link/0]).
 
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
%% Internal state
-record(state, {
connection :: pid(), % rabbitMQ connection
connection_ref :: reference(), % connection monitor ref
channel :: pid(), % rabbitMQ channel
channel_ref :: reference(), % channel monitor ref
% ---
rabbitmq_restart_timeout = 5000 :: pos_integer(), % restart timeout
}).
 
%%==========
%% API
%%
%% @doc Starts the server
%% @end
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
init(_Args) ->
gen_server:cast(self(), connect),
{ok, #state{
rabbitmq_restart_timeout = appenv:get(?APP, rabbitmq_restart_timeout, 5000)
}}.
 
%%
%% @doc Handling all messages from RabbitMQ
%% @end
handle_info({#'basic.deliver'{delivery_tag = Tag, routing_key = _Queue}, #amqp_msg{props = #'P_basic'{reply_to = ReplyTo}, payload = Body}} = _Msg, #state{channel = Channel} = State) ->
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
try
Message = binary_to_term(Body),
?INFO("You go: ~p", [Message])
catch
_:_ ->
?ERR("Cannot parse message from shop: ~p", [Body]),
error
end,
{noreply, State};
 
handle_info({'DOWN', ConnectionRef, process, Connection, Reason}, #state{connection = Connection, connection_ref = ConnectionRef} = State) ->
?WARN("AMQP connection error: ~p", [Reason]),
restart_me(State);
 
handle_info({'DOWN', ChannelRef, process, Channel, Reason}, #state{channel = Channel, channel_ref = ChannelRef} = State) ->
?WARN("AMQP channel error: ~p", [Reason]),
restart_me(State);
 
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
 
handle_info(#'basic.cancel'{}, State) ->
{noreply, State};
 
handle_info(_Info, State) ->
?ERR("Unsupported info message: ~p", [_Info]),
{noreply, State}.
 
handle_cast(connect, State) ->
% connection parameters
AMQP_Param = #amqp_params_network{
host = appenv:get(?APP, rabbitmq_host, "localhost"),
username = appenv:get(?APP, rabbitmq_username, <<"max">>),
password = appenv:get(?APP, rabbitmq_password, <<"password">>),
port = appenv:get(?APP, rabbitmq_port, 5672),
virtual_host = appenv:get(?APP, rabbitmq_vhost, <<"vhost">>),
heartbeat = appenv:get(?APP, rabbitmq_heartbeat, 5)
},
% connection...
case amqp_connection:start(AMQP_Param) of
{ok, Connection} ->
% start connection monitor
ConnectionRef = erlang:monitor(process, Connection),
case amqp_connection:open_channel(Connection) of
{ok, Channel} ->
ChannelRef = erlang:monitor(process, Channel),
%
% Here you have to subscribe to queues you want to listen
%
%
{noreply, State#state{
connection = Connection,
connection_ref = ConnectionRef,
channel = Channel,
channel_ref = ChannelRef
}};
_Reason2 ->
% channel error
?WARN("AMQP channel error: ~p", [_Reason2]),
restart_me(State)
end;
_Reason1 ->
% connection error
?WARN("AMQP connection error: ~p", [_Reason1]),
restart_me(State)
end;
 
handle_cast(_Msg, State) ->
?ERR("Unsupported cast message: ~p", [_Msg]),
{noreply, State}.
 
handle_call(_Request, _From, State) ->
?ERR("Unsupported call message: ~p, from: ~p", [_Request, _From]),
{reply, ok, State}.
 
terminate(_Reason, #state{connection = Connection, channel = Channel} = _State) ->
if
is_pid(Channel) -> amqp_channel:close(Channel);
true -> pass
end,
if
is_pid(Connection) -> amqp_connection:close(Connection);
true -> pass
end,
ok.
 
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
 
restart_me(#state{rabbitmq_restart_timeout = Wait} = State) ->
?WARN("AMQP is down, will try to reconnect in ~p seconds.", [(Wait/1000)]),
timer:sleep(Wait), % Sleep for Wait seconds
{stop, error, State}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.