Skip to content

Instantly share code, notes, and snippets.

@maxlapshin
Last active August 29, 2019 10:45
Show Gist options
  • Save maxlapshin/ea632f04f79188aaba46d3a92406f8f6 to your computer and use it in GitHub Desktop.
Save maxlapshin/ea632f04f79188aaba46d3a92406f8f6 to your computer and use it in GitHub Desktop.
-module(events_router).
-export([start_link/0]).
-export([init/1, handle_info/2, handle_call/3, terminate/2]).
-export([sync/0]).
-export([deliver/2, remote_deliver/1]).
-export([load_config/1, add_handler/1, remove_handler/1]).
-export([suitable_events/1, match_event/2]).
-include("event_pt.hrl").
-record(route, {
event,
name,
only,
except,
verbose,
pid
}).
deliver(Event, Metadata0) when is_list(Metadata0) ->
deliver(Event, maps:from_list(Metadata0));
deliver(Event, Metadata0) ->
EvtId = erlang:unique_integer([positive,monotonic]),
Metadata1 = Metadata0#{event_id => EvtId},
case known_events:find(Event) of
false ->
false;
EventDesc = #event{level = Level} ->
{PDateTime, PTime, UtcMs} = print_now(),
Metadata2 = Metadata1#{time => PTime, date => PDateTime, utc_ms => UtcMs, event => Event},
Metadata3 = case Event of
text ->
Metadata2;
_ ->
case Metadata2 of
#{message := _M, loglevel := _} ->
Metadata2;
_ ->
Message = try events_util:format_message(EventDesc, Metadata2)
catch
_C:_E:_ST ->
% io:format("Failed to format ~p:~p\n~s ~p ~p\n~p", [_C,_E, Event, EventDesc#event.fmt, Metadata1, _ST]),
<<"failed to format">>
end,
Metadata2#{message => Message, loglevel => Level}
end
end,
Metadata = Metadata3,
% io:format(standard_error, "Deliver ~p to ~p\n",[Metadata0, ets:lookup(events_routes, Event)]),
case whereis(events_proxy) of
undefined ->
local_deliver(Event, Metadata);
Proxy ->
Proxy ! {'$event', Metadata}
end,
EvtId
end.
print_now() ->
UtcMs = minute:now_ms(),
{Date, Time} = minute:log_time(UtcMs),
{Date, Time, UtcMs}.
remote_deliver(#{event := Event} = Metadata) ->
local_deliver(Event, Metadata).
local_deliver(Event, #{event_id := EvtId, loglevel := Level, time := PTime} = Metadata) ->
try ets:lookup(events_routes, Event) of
Routes ->
[deliver_to_route(Route, Metadata) || Route <- Routes],
EvtId
catch
_:_ when Level == debug ->
ok;
_:_ ->
io:format("~s ~p ~p\n", [PTime, Level, maps:get(message,Metadata,undefined)])
end.
deliver_to_route(#route{pid = undefined}, _) ->
not_started;
deliver_to_route(#route{name = Name, pid = Pid, event = Event} = Route, Metadata) ->
case events_sink:overloaded(Name) of
true ->
ets:update_counter(events_handlers, Name, [{#handler.dropped, 1}]),
overloaded;
false ->
case match_event(Route, Metadata) of
true ->
Pid ! {'$event', Metadata#{event => Event}};
false ->
nomatch
end
end.
loglevel(none) -> -1;
loglevel(undefined) -> undefined;
loglevel(debug) -> 0;
loglevel(info) -> 1;
loglevel(notice) -> 2;
loglevel(warning) -> 3;
loglevel(error) -> 4;
loglevel(critical) -> 5;
loglevel(alert) -> 6;
loglevel(emergency) -> 7.
match_event(#route{only = Only, except = Except, event = Event, verbose = Verbose} = _R, Metadata) ->
Loglevel = maps:get(loglevel, Metadata),
VerboseCode = loglevel(Verbose),
LoglevelCode = loglevel(Loglevel),
Skip = if
Verbose == undefined -> false;
Verbose == undefined andalso Loglevel == none -> false;
true -> VerboseCode > LoglevelCode
end,
% io:format(standard_error, "~p ~p ~p ~p ~p\n", [_R#route.name, Event, Verbose, Loglevel, Skip]),
case Skip of
true -> false;
false -> match_event(Except, Only, Metadata#{event => Event})
end.
match_event(Except, Only, Meta) ->
Blacklisted = case Except of
[] -> false;
_ -> lists:any(fun(E) -> match_meta(E,Meta) end, Except)
end,
case Blacklisted of
true ->
false;
false ->
case Only of
[] -> true;
_ -> lists:any(fun(O) -> match_meta(O,Meta) end, Only)
end
end.
match_meta([], _) -> true;
match_meta([{K,Values}|List], Meta) when is_list(Values) ->
case lists:member(maps:get(K,Meta, undefined), Values) of
true -> match_meta(List, Meta);
_ -> false
end;
match_meta([{K,V}|List], Meta) ->
case maps:get(K,Meta,undefined) of
V0 when V0 =/= undefined andalso V == <<"*">> -> match_meta(List, Meta);
V -> match_meta(List, Meta);
_ -> false
end.
load_config(#{} = Notifies) ->
gen_server:call(?MODULE, {install_notifies, Notifies}).
add_handler(#{name := _} = Notify) ->
gen_server:call(?MODULE, {add_handler, Notify}).
remove_handler(Name) ->
gen_server:call(?MODULE, {remove_handler, Name}).
sync() ->
gen_server:call(?MODULE, sync),
[begin
Pid ! throttle_stop,
try gen_server:call(Pid, sync)
catch
exit:_ -> ok
end
end || {_,Pid,_,_} <- supervisor:which_children(events_manager_sup)],
ok.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-record(state, {
refresh_timer,
notifies = #{},
extra_notifies = #{}
}).
init([]) ->
ets:new(events_routes, [public, named_table, bag, {read_concurrency,true}, {keypos, #route.event}]),
ets:new(events_handlers, [public, named_table, {read_concurrency,true}, {keypos, #handler.name}]),
{ok, #state{}}.
handle_stop(Pid, Reason, List) ->
case [N || {_, #{pid := P} = N} <- maps:to_list(List), P == Pid] of
[] ->
false;
[#{name := Name} = Notify] ->
shutdown_old(Name),
case Reason of
normal ->
maps:remove(Name, List);
_ ->
maps:put(Name, maps:without([pid,ref], Notify), List)
end
end.
handle_info({'DOWN', _, _, Pid, Reason}, #state{notifies = Notifies, extra_notifies = Extra} = State) ->
{ok, State2} = case handle_stop(Pid, Reason, Notifies) of
false ->
case handle_stop(Pid, Reason, Extra) of
false ->
{ok, State};
Extra1 ->
{ok, State#state{extra_notifies = Extra1}}
end;
Notifies1 ->
{ok, State#state{notifies = Notifies1}}
end,
case State2#state.refresh_timer of
undefined ->
{noreply, State2#state{refresh_timer = erlang:send_after(2000, self(), refresh)}};
_ ->
{noreply, State2}
end;
handle_info(refresh, #state{refresh_timer = T} = State) ->
case T of
undefined -> ok;
_ -> erlang:cancel_timer(T)
end,
{ok, State1} = refresh(State#state{refresh_timer = undefined}),
{noreply, State1};
handle_info(Msg, State) ->
{stop, {bad_msg,Msg}, State}.
handle_call(sync, _From, #state{} = State) ->
{reply, ok, State};
handle_call({remove_handler, Name}, _From, #state{extra_notifies = Extra} = State) ->
shutdown_old(Name),
{ok, State1} = refresh(State#state{extra_notifies = maps:remove(Name, Extra)}),
{reply, ok, State1};
handle_call({add_handler, #{name := Name, sink := NewSink} = Notify}, _From, #state{extra_notifies = Extra} = State) ->
OldNotify = case maps:get(Name, Extra, #{}) of
#{sink := NewSink} = ON -> maps:with([pid,ref],ON);
#{sink := _} -> shutdown_old(Name), #{};
_ -> #{}
end,
Notify1 = maps:merge(Notify, OldNotify),
{ok, State1} = refresh(State#state{extra_notifies = maps:put(Name, Notify1, Extra)}),
{reply, Name, State1};
handle_call({install_notifies, Notifies}, _From, #state{notifies = OldNotifies} = State) ->
Notifies2 = maps:map(fun(Name, #{sink := NewSink} = N) ->
OldNotify = case maps:get(Name, OldNotifies, #{}) of
#{sink := NewSink} = ON -> maps:with([pid,ref],ON);
#{sink := _} -> shutdown_old(Name), #{};
_ -> #{}
end,
maps:merge(N, OldNotify)
end, Notifies),
{ok, State1} = refresh(State#state{notifies = Notifies2}),
{reply, ok, State1};
handle_call(handlers, _From, #state{extra_notifies = Extra, notifies = Notifies} = State) ->
{reply, {Notifies, Extra}, State};
handle_call(Call, _From, State) ->
{reply, {bad_call, Call}, State}.
terminate(_,_) -> ok.
refresh(#state{notifies = Notifies, extra_notifies = Extra} = State) ->
Existing = [Name || {Name,_,_,_} <- supervisor:which_children(events_manager_sup)],
ToRemove = [Name || Name <- Existing, not maps:is_key(Name, Notifies) andalso not maps:is_key(Name, Extra) ],
[shutdown_old(Name) || Name <- ToRemove],
Notifies2 = maps:map(fun
(_Name, #{pid := Pid} = N) when is_pid(Pid) -> N;
(_Name, #{} = N) -> launch_new(N)
end, Notifies),
Extra2 = maps:map(fun
(_Name, #{pid := Pid} = N) when is_pid(Pid) -> N;
(_Name, #{} = N) -> launch_new(N)
end, Extra),
Installed =
install_notifies(maps:values(Notifies2)) ++
install_notifies(maps:values(Extra2)),
[case lists:member(Entry, Installed) of
true -> ok;
false -> ets:delete_object(events_routes, Entry)
end || Entry <- ets:tab2list(events_routes)],
{ok, State#state{notifies = Notifies2, extra_notifies = Extra2}}.
shutdown_old(Name) ->
supervisor:terminate_child(events_manager_sup, Name),
supervisor:delete_child(events_manager_sup, Name),
[ets:delete_object(events_routes, Route) || #route{name = N} = Route <- ets:tab2list(events_routes), N == Name],
ets:delete(events_handlers, Name),
ok.
launch_new(#{name := Name, sink := Sink} = Notify) ->
Proto = events_util:sink_protocol(Sink),
Mod = case Proto of
http -> events_sink_http;
erl -> events_sink_erl;
pid -> events_sink_pid;
lua -> events_sink_lua;
log -> events_sink_log;
console -> events_sink_console;
undefined -> undefined;
{plugin, Plugin} -> list_to_atom("events_sink_"++atom_to_list(Plugin))
end,
case Mod of
undefined ->
Notify;
_ ->
{ok, Pid} = supervisor:start_child(events_manager_sup,
#{id => Name, start => {events_sink, start_link, [Mod, Notify]}, restart => transient, shutdown => 500}),
Ref = erlang:monitor(process, Pid),
Notify#{pid => Pid, ref => Ref}
end.
events() ->
[Name || #event{name = Name} <- known_events:all()].
install_notifies([#{name := Name} = Notify|Notifies]) ->
Only = maps:get(only, Notify, []),
Except = maps:get(except, Notify, []),
Suitable = suitable_events(Notify),
Routes = [ #route{
event = Evt,
name = Name,
verbose = maps:get(verbose, Notify, undefined),
only = [atomize(maps:to_list(O)) || O <- Only],
except = [atomize(maps:to_list(E)) || E <- Except],
pid = maps:get(pid, Notify, undefined)
} || Evt <- Suitable ],
ets:insert(events_routes, Routes),
case maps:get(pid, Notify, undefined) of
undefined -> ok;
Pid -> Pid ! {update_options, Notify}
end,
Routes ++ install_notifies(Notifies);
install_notifies([]) ->
[].
atomize([{application,Evt}|List]) when is_binary(Evt) -> [{application,binary_to_atom(Evt,latin1)}|List];
atomize([{module,Evt}|List]) when is_binary(Evt) -> [{module,binary_to_atom(Evt,latin1)}|List];
atomize([{event,Evt}|List]) when is_binary(Evt) -> [{event,binary_to_atom(Evt,latin1)}|List];
atomize([{event,Evts}|List]) when is_list(Evts) -> [{event,[binary_to_atom(Evt,latin1) || Evt <- Evts]}|List];
atomize([{K,V}|List]) -> [{K,V}|atomize(List)];
atomize([]) -> [].
suitable_events(#{} = Notify) ->
All0 = lists:usort(events()),
All = case maps:get(verbose,Notify,undefined) of
undefined -> All0 -- [crashed,text];
_ -> All0
end,
Whitelist = select_whitelisted_events(maps:get(only, Notify, []), All),
Blacklist = select_blacklisted_events(maps:get(except, Notify, []), All),
case Whitelist of
[] ->
All -- Blacklist;
_ ->
Whitelist
end.
select_whitelisted_events(Notifies, All) ->
merge_events(Notifies, [], All, white).
select_blacklisted_events(Notifies, All) ->
merge_events(Notifies, [], All, black).
merge_events([], Acc, _, _) ->
lists:usort(Acc);
merge_events([#{event := Evts}|List], Acc, All, WhiteBlack) when is_list(Evts) ->
merge_events(List, [binary_to_atom(Evt,latin1) || Evt <- Evts] ++ Acc, All, WhiteBlack);
merge_events([#{event := Evt}|List], Acc, All, WhiteBlack) when is_binary(Evt) ->
merge_events(List, [to_a(Evt)|Acc], All, WhiteBlack);
merge_events([#{}|_], _, All, white) ->
All;
merge_events([#{}|_], _, _All, black) ->
[].
to_a(Atom) when is_atom(Atom) -> Atom;
to_a(Bin) when is_binary(Bin) -> binary_to_atom(Bin, latin1).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment