Skip to content

Instantly share code, notes, and snippets.

@maxlapshin
Created August 29, 2019 10:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxlapshin/7873727e3f93519cf4bb14287264fd3d to your computer and use it in GitHub Desktop.
Save maxlapshin/7873727e3f93519cf4bb14287264fd3d to your computer and use it in GitHub Desktop.
-module(events_sink).
-export([start_link/2]).
-export([init/1, handle_info/2, handle_call/3, terminate/2]).
-export([overloaded/1]).
-include("event_pt.hrl").
-define(RECHECK_INTERVAL, 200).
-define(ALLOWED_DELAY, 60000).
start_link(Mod, Notify) ->
gen_server:start_link(?MODULE, [Mod, Notify], []).
overloaded(Name) ->
case ets:lookup(events_handlers, Name) of
[#handler{rtt_at = LastSeenAt}] ->
now_ms() - LastSeenAt > ?ALLOWED_DELAY;
[] ->
false
end.
-record(state, {
name,
mod,
args,
timer,
buffer,
throttle_delay,
throttle_timer,
throttle_buffer,
resend_notifications_limit,
resend_notifications_timeout,
resend_notifications_queue,
resend_notifications_queue_size,
resend_notifications_timer,
owner,
ref,
state
}).
now_ms() ->
os:system_time(milli_seconds).
init([Mod, #{name := Name} = Notify]) ->
{ok, State} = Mod:init(Notify),
Now = now_ms(),
ets:insert(events_handlers, #handler{name = Name, pid = self(), rtt_at = Now, started_at = Now,
verbose = maps:get(verbose, Notify, undefined), last_event_at = 0, rtt_delay = 0}),
State1 = #state{
name = Name,
mod = Mod,
state = State,
timer = erlang:send_after(0, self(), {round_trip, Now}),
buffer = maps:get(buffer, Notify, undefined),
throttle_delay = maps:get(throttle_delay, Notify, undefined),
args = Notify
},
State2 = monitor_owner(maps:get(owner,Notify,undefined), State1),
State3 = update_options(Notify, State2),
{ok, State3}.
update_options(Notify, #state{buffer = Buffer, resend_notifications_queue = OldQueue} = State) ->
ResendLimit = maps:get(resend_notifications_limit, Notify, undefined),
ResendTimeout = maps:get(resend_notifications_timeout, Notify, 10),
State1 = if
ResendLimit =/= undefined andalso Buffer =/= false andalso OldQueue == undefined ->
State#state{
resend_notifications_queue = [],
resend_notifications_queue_size = 0,
resend_notifications_limit = ResendLimit,
resend_notifications_timeout = ResendTimeout*1000
};
ResendLimit =/= undefined andalso Buffer =/= false andalso OldQueue =/= undefined ->
State#state{
resend_notifications_limit = ResendLimit,
resend_notifications_timeout = ResendTimeout*1000
};
ResendLimit == undefined orelse Buffer == false ->
State#state{
resend_notifications_queue = undefined,
resend_notifications_queue_size = undefined,
resend_notifications_limit = undefined,
resend_notifications_timeout = undefined
}
end,
State1.
handle_call(sync, _, #state{mod = Mod, state = S0} = State) ->
S1 = case erlang:function_exported(Mod,sync,1) of
true -> Mod:sync(S0);
false -> S0
end,
{reply, ok, State#state{state = S1}}.
handle_info({round_trip, SentAt}, #state{timer = OldTimer, name = Name} = State) ->
erlang:cancel_timer(OldTimer),
Now = now_ms(),
Delay = Now - SentAt,
ets:update_element(events_handlers, Name, [{#handler.rtt_at, Now}, {#handler.rtt_delay, Delay}]),
Timer = erlang:send_after(?RECHECK_INTERVAL, self(), {round_trip, Now + ?RECHECK_INTERVAL}),
{noreply, State#state{timer = Timer}};
handle_info({'$event', Evt}, #state{throttle_delay = ThrottleDelay} = State) when is_integer(ThrottleDelay) ->
Events = [Evt|collect_events(99)],
ThrottleBuffer = queue_add_list(Events, State#state.throttle_buffer),
State1 = ensure_throttle_timer(State),
{noreply, State1#state{throttle_buffer = ThrottleBuffer}};
handle_info(throttle_stop, #state{} = State0) ->
case State0#state.throttle_timer of
undefined -> ok;
_ -> erlang:cancel_timer(State0#state.throttle_timer)
end,
Events = collect_events(99),
ThrottleBuffer = queue:to_list(queue_add_list(Events, State0#state.throttle_buffer)),
State1 = State0#state{throttle_timer = undefined, throttle_buffer = undefined},
case ThrottleBuffer of
[] ->
{noreply, State1};
_ ->
Reply = deliver_events(ThrottleBuffer, State1),
Reply
end;
handle_info(resend_notifications_send, #state{resend_notifications_queue = Queue} = State) ->
if
Queue == undefined orelse Queue == [] ->
{noreply, State};
true ->
Reply = deliver_events([], State),
Reply
end;
handle_info({'$event', Evt}, #state{buffer = false} = State) ->
Reply = deliver_events([Evt], State),
Reply;
handle_info({'$event', Evt}, #state{} = State) ->
Events = [Evt|collect_events(99)],
Reply = deliver_events(Events, State),
Reply;
handle_info({'DOWN', Ref, _, _, _}, #state{ref = Ref} = State) ->
{stop, normal, State};
handle_info(Msg, #state{mod = Mod, state = S0} = State) ->
case Mod:handle_info(Msg, S0) of
{ok, S1} ->
{noreply, State#state{state = S1}};
{stop, Reason, S1} ->
{stop, Reason, State#state{state = S1}}
end.
deliver_events(Events, #state{mod = Mod, state = S0, resend_notifications_queue = Queue, resend_notifications_limit = Cnt, name = Name} = State) ->
{Events1, Queue1} = if
Queue == undefined orelse Queue == [] ->
{Events, Queue};
true ->
% Распиливаем queue_size на 4 части, что бы не одним большим батчем слать
lists:split(lists:min([lists:max([Cnt div 4,100]),State#state.resend_notifications_queue_size+length(Events)]), Queue ++ Events)
end,
Size1 = if
Queue1 == undefined -> undefined;
true -> length(Queue1)
end,
% [_|_] = Events1,
Reply = case Mod:handle_events(Events1, S0) of
{ok, S1} ->
Now = now_ms(),
ets:update_counter(events_handlers, Name, [{#handler.handled, length(Events)}, {#handler.last_event_at, Now, Now, Now}]),
{noreply, State#state{state = S1, resend_notifications_queue = Queue1, resend_notifications_queue_size = Size1}};
{resend, S1} when Queue == undefined ->
{noreply, State#state{state = S1}};
{resend, S1} when Queue =/= undefined ->
Queue2 = Events1++Queue1,
Size2 = length(Events1) + Size1,
Now = now_ms(),
ets:update_counter(events_handlers, Name, [{#handler.handled, length(Events)}, {#handler.last_event_at, Now, Now, Now}]),
State1 = State#state{resend_notifications_queue = Queue2, resend_notifications_queue_size = Size2, state = S1},
State2 = if
Size2 - 0 > 0 ->
Timer = case State#state.resend_notifications_timer of
undefined -> erlang:send_after(State1#state.resend_notifications_timeout, self(), resend_notifications_send);
RT0 -> RT0
end,
maybe_flush_resend_notifications_queue(State1#state{resend_notifications_timer = Timer});
true ->
State1
end,
{noreply, State2};
{stop, Reason, S1} ->
{stop, Reason, State#state{state = S1}}
end,
Reply.
maybe_flush_resend_notifications_queue(#state{resend_notifications_queue_size = S, resend_notifications_limit = L, resend_notifications_queue = Q} = State) when
S - L > 0 ->
Q1 = lists:nthtail(S - L, Q),
State#state{resend_notifications_queue_size = L, resend_notifications_queue = Q1};
maybe_flush_resend_notifications_queue(#state{} = State) ->
State.
queue_add_list(Items, {_,_}=Queue) -> queue:join(Queue, queue:from_list(Items));
queue_add_list(Items, _) -> queue:from_list(Items).
ensure_throttle_timer(#state{throttle_timer = undefined, throttle_delay = PushAfter} = State) ->
State#state{throttle_timer = erlang:send_after(PushAfter*1000, self(), throttle_stop)};
ensure_throttle_timer(#state{} = State) ->
State.
collect_events(Limit) when Limit =< 0 -> [];
collect_events(Limit) ->
receive
{'$event', E} -> [E|collect_events(Limit - 1)]
after
0 -> []
end.
monitor_owner(undefined, #state{} = State) ->
State;
monitor_owner(Pid, #state{} = State) ->
case Pid of
<<"<",_/binary>> ->
Owner = list_to_pid(binary_to_list(Pid)),
Ref = erlang:monitor(process, Owner),
State#state{owner = Owner, ref = Ref};
<<_,_/binary>> ->
Owner = whereis(binary_to_atom(Pid,latin1)),
Ref = erlang:monitor(process, Owner),
State#state{owner = Owner, ref = Ref}
end.
terminate(_,_) -> ok.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment