-
-
Save anonymous/6cc6099af6c2255282e8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%%% | |
%%% | |
%%% | |
-module (module_name). | |
-behaviour(gen_server). | |
-include_lib("amqp_client/include/amqp_client.hrl"). | |
-export([start_link/0]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). | |
%% Internal state | |
-record(state, { | |
connection :: pid(), % rabbitMQ connection | |
connection_ref :: reference(), % connection monitor ref | |
channel :: pid(), % rabbitMQ channel | |
channel_ref :: reference(), % channel monitor ref | |
% --- | |
rabbitmq_restart_timeout = 5000 :: pos_integer(), % restart timeout | |
}). | |
%%========== | |
%% API | |
%% | |
%% @doc Starts the server | |
%% @end | |
start_link() -> | |
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
init(_Args) -> | |
gen_server:cast(self(), connect), | |
{ok, #state{ | |
rabbitmq_restart_timeout = appenv:get(?APP, rabbitmq_restart_timeout, 5000) | |
}}. | |
%% | |
%% @doc Handling all messages from RabbitMQ | |
%% @end | |
handle_info({#'basic.deliver'{delivery_tag = Tag, routing_key = _Queue}, #amqp_msg{props = #'P_basic'{reply_to = ReplyTo}, payload = Body}} = _Msg, #state{channel = Channel} = State) -> | |
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}), | |
try | |
Message = binary_to_term(Body), | |
?INFO("You go: ~p", [Message]) | |
catch | |
_:_ -> | |
?ERR("Cannot parse message from shop: ~p", [Body]), | |
error | |
end, | |
{noreply, State}; | |
handle_info({'DOWN', ConnectionRef, process, Connection, Reason}, #state{connection = Connection, connection_ref = ConnectionRef} = State) -> | |
?WARN("AMQP connection error: ~p", [Reason]), | |
restart_me(State); | |
handle_info({'DOWN', ChannelRef, process, Channel, Reason}, #state{channel = Channel, channel_ref = ChannelRef} = State) -> | |
?WARN("AMQP channel error: ~p", [Reason]), | |
restart_me(State); | |
handle_info(#'basic.consume_ok'{}, State) -> | |
{noreply, State}; | |
handle_info(#'basic.cancel'{}, State) -> | |
{noreply, State}; | |
handle_info(_Info, State) -> | |
?ERR("Unsupported info message: ~p", [_Info]), | |
{noreply, State}. | |
handle_cast(connect, State) -> | |
% connection parameters | |
AMQP_Param = #amqp_params_network{ | |
host = appenv:get(?APP, rabbitmq_host, "localhost"), | |
username = appenv:get(?APP, rabbitmq_username, <<"max">>), | |
password = appenv:get(?APP, rabbitmq_password, <<"password">>), | |
port = appenv:get(?APP, rabbitmq_port, 5672), | |
virtual_host = appenv:get(?APP, rabbitmq_vhost, <<"vhost">>), | |
heartbeat = appenv:get(?APP, rabbitmq_heartbeat, 5) | |
}, | |
% connection... | |
case amqp_connection:start(AMQP_Param) of | |
{ok, Connection} -> | |
% start connection monitor | |
ConnectionRef = erlang:monitor(process, Connection), | |
case amqp_connection:open_channel(Connection) of | |
{ok, Channel} -> | |
ChannelRef = erlang:monitor(process, Channel), | |
% | |
% Here you have to subscribe to queues you want to listen | |
% | |
% | |
{noreply, State#state{ | |
connection = Connection, | |
connection_ref = ConnectionRef, | |
channel = Channel, | |
channel_ref = ChannelRef | |
}}; | |
_Reason2 -> | |
% channel error | |
?WARN("AMQP channel error: ~p", [_Reason2]), | |
restart_me(State) | |
end; | |
_Reason1 -> | |
% connection error | |
?WARN("AMQP connection error: ~p", [_Reason1]), | |
restart_me(State) | |
end; | |
handle_cast(_Msg, State) -> | |
?ERR("Unsupported cast message: ~p", [_Msg]), | |
{noreply, State}. | |
handle_call(_Request, _From, State) -> | |
?ERR("Unsupported call message: ~p, from: ~p", [_Request, _From]), | |
{reply, ok, State}. | |
terminate(_Reason, #state{connection = Connection, channel = Channel} = _State) -> | |
if | |
is_pid(Channel) -> amqp_channel:close(Channel); | |
true -> pass | |
end, | |
if | |
is_pid(Connection) -> amqp_connection:close(Connection); | |
true -> pass | |
end, | |
ok. | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. | |
restart_me(#state{rabbitmq_restart_timeout = Wait} = State) -> | |
?WARN("AMQP is down, will try to reconnect in ~p seconds.", [(Wait/1000)]), | |
timer:sleep(Wait), % Sleep for Wait seconds | |
{stop, error, State}. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment