Last active
August 29, 2019 10:45
-
-
Save maxlapshin/ea632f04f79188aaba46d3a92406f8f6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(events_router). | |
-export([start_link/0]). | |
-export([init/1, handle_info/2, handle_call/3, terminate/2]). | |
-export([sync/0]). | |
-export([deliver/2, remote_deliver/1]). | |
-export([load_config/1, add_handler/1, remove_handler/1]). | |
-export([suitable_events/1, match_event/2]). | |
-include("event_pt.hrl"). | |
-record(route, { | |
event, | |
name, | |
only, | |
except, | |
verbose, | |
pid | |
}). | |
deliver(Event, Metadata0) when is_list(Metadata0) -> | |
deliver(Event, maps:from_list(Metadata0)); | |
deliver(Event, Metadata0) -> | |
EvtId = erlang:unique_integer([positive,monotonic]), | |
Metadata1 = Metadata0#{event_id => EvtId}, | |
case known_events:find(Event) of | |
false -> | |
false; | |
EventDesc = #event{level = Level} -> | |
{PDateTime, PTime, UtcMs} = print_now(), | |
Metadata2 = Metadata1#{time => PTime, date => PDateTime, utc_ms => UtcMs, event => Event}, | |
Metadata3 = case Event of | |
text -> | |
Metadata2; | |
_ -> | |
case Metadata2 of | |
#{message := _M, loglevel := _} -> | |
Metadata2; | |
_ -> | |
Message = try events_util:format_message(EventDesc, Metadata2) | |
catch | |
_C:_E:_ST -> | |
% io:format("Failed to format ~p:~p\n~s ~p ~p\n~p", [_C,_E, Event, EventDesc#event.fmt, Metadata1, _ST]), | |
<<"failed to format">> | |
end, | |
Metadata2#{message => Message, loglevel => Level} | |
end | |
end, | |
Metadata = Metadata3, | |
% io:format(standard_error, "Deliver ~p to ~p\n",[Metadata0, ets:lookup(events_routes, Event)]), | |
case whereis(events_proxy) of | |
undefined -> | |
local_deliver(Event, Metadata); | |
Proxy -> | |
Proxy ! {'$event', Metadata} | |
end, | |
EvtId | |
end. | |
print_now() -> | |
UtcMs = minute:now_ms(), | |
{Date, Time} = minute:log_time(UtcMs), | |
{Date, Time, UtcMs}. | |
remote_deliver(#{event := Event} = Metadata) -> | |
local_deliver(Event, Metadata). | |
local_deliver(Event, #{event_id := EvtId, loglevel := Level, time := PTime} = Metadata) -> | |
try ets:lookup(events_routes, Event) of | |
Routes -> | |
[deliver_to_route(Route, Metadata) || Route <- Routes], | |
EvtId | |
catch | |
_:_ when Level == debug -> | |
ok; | |
_:_ -> | |
io:format("~s ~p ~p\n", [PTime, Level, maps:get(message,Metadata,undefined)]) | |
end. | |
deliver_to_route(#route{pid = undefined}, _) -> | |
not_started; | |
deliver_to_route(#route{name = Name, pid = Pid, event = Event} = Route, Metadata) -> | |
case events_sink:overloaded(Name) of | |
true -> | |
ets:update_counter(events_handlers, Name, [{#handler.dropped, 1}]), | |
overloaded; | |
false -> | |
case match_event(Route, Metadata) of | |
true -> | |
Pid ! {'$event', Metadata#{event => Event}}; | |
false -> | |
nomatch | |
end | |
end. | |
loglevel(none) -> -1; | |
loglevel(undefined) -> undefined; | |
loglevel(debug) -> 0; | |
loglevel(info) -> 1; | |
loglevel(notice) -> 2; | |
loglevel(warning) -> 3; | |
loglevel(error) -> 4; | |
loglevel(critical) -> 5; | |
loglevel(alert) -> 6; | |
loglevel(emergency) -> 7. | |
match_event(#route{only = Only, except = Except, event = Event, verbose = Verbose} = _R, Metadata) -> | |
Loglevel = maps:get(loglevel, Metadata), | |
VerboseCode = loglevel(Verbose), | |
LoglevelCode = loglevel(Loglevel), | |
Skip = if | |
Verbose == undefined -> false; | |
Verbose == undefined andalso Loglevel == none -> false; | |
true -> VerboseCode > LoglevelCode | |
end, | |
% io:format(standard_error, "~p ~p ~p ~p ~p\n", [_R#route.name, Event, Verbose, Loglevel, Skip]), | |
case Skip of | |
true -> false; | |
false -> match_event(Except, Only, Metadata#{event => Event}) | |
end. | |
match_event(Except, Only, Meta) -> | |
Blacklisted = case Except of | |
[] -> false; | |
_ -> lists:any(fun(E) -> match_meta(E,Meta) end, Except) | |
end, | |
case Blacklisted of | |
true -> | |
false; | |
false -> | |
case Only of | |
[] -> true; | |
_ -> lists:any(fun(O) -> match_meta(O,Meta) end, Only) | |
end | |
end. | |
match_meta([], _) -> true; | |
match_meta([{K,Values}|List], Meta) when is_list(Values) -> | |
case lists:member(maps:get(K,Meta, undefined), Values) of | |
true -> match_meta(List, Meta); | |
_ -> false | |
end; | |
match_meta([{K,V}|List], Meta) -> | |
case maps:get(K,Meta,undefined) of | |
V0 when V0 =/= undefined andalso V == <<"*">> -> match_meta(List, Meta); | |
V -> match_meta(List, Meta); | |
_ -> false | |
end. | |
load_config(#{} = Notifies) -> | |
gen_server:call(?MODULE, {install_notifies, Notifies}). | |
add_handler(#{name := _} = Notify) -> | |
gen_server:call(?MODULE, {add_handler, Notify}). | |
remove_handler(Name) -> | |
gen_server:call(?MODULE, {remove_handler, Name}). | |
sync() -> | |
gen_server:call(?MODULE, sync), | |
[begin | |
Pid ! throttle_stop, | |
try gen_server:call(Pid, sync) | |
catch | |
exit:_ -> ok | |
end | |
end || {_,Pid,_,_} <- supervisor:which_children(events_manager_sup)], | |
ok. | |
start_link() -> | |
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). | |
-record(state, { | |
refresh_timer, | |
notifies = #{}, | |
extra_notifies = #{} | |
}). | |
init([]) -> | |
ets:new(events_routes, [public, named_table, bag, {read_concurrency,true}, {keypos, #route.event}]), | |
ets:new(events_handlers, [public, named_table, {read_concurrency,true}, {keypos, #handler.name}]), | |
{ok, #state{}}. | |
handle_stop(Pid, Reason, List) -> | |
case [N || {_, #{pid := P} = N} <- maps:to_list(List), P == Pid] of | |
[] -> | |
false; | |
[#{name := Name} = Notify] -> | |
shutdown_old(Name), | |
case Reason of | |
normal -> | |
maps:remove(Name, List); | |
_ -> | |
maps:put(Name, maps:without([pid,ref], Notify), List) | |
end | |
end. | |
handle_info({'DOWN', _, _, Pid, Reason}, #state{notifies = Notifies, extra_notifies = Extra} = State) -> | |
{ok, State2} = case handle_stop(Pid, Reason, Notifies) of | |
false -> | |
case handle_stop(Pid, Reason, Extra) of | |
false -> | |
{ok, State}; | |
Extra1 -> | |
{ok, State#state{extra_notifies = Extra1}} | |
end; | |
Notifies1 -> | |
{ok, State#state{notifies = Notifies1}} | |
end, | |
case State2#state.refresh_timer of | |
undefined -> | |
{noreply, State2#state{refresh_timer = erlang:send_after(2000, self(), refresh)}}; | |
_ -> | |
{noreply, State2} | |
end; | |
handle_info(refresh, #state{refresh_timer = T} = State) -> | |
case T of | |
undefined -> ok; | |
_ -> erlang:cancel_timer(T) | |
end, | |
{ok, State1} = refresh(State#state{refresh_timer = undefined}), | |
{noreply, State1}; | |
handle_info(Msg, State) -> | |
{stop, {bad_msg,Msg}, State}. | |
handle_call(sync, _From, #state{} = State) -> | |
{reply, ok, State}; | |
handle_call({remove_handler, Name}, _From, #state{extra_notifies = Extra} = State) -> | |
shutdown_old(Name), | |
{ok, State1} = refresh(State#state{extra_notifies = maps:remove(Name, Extra)}), | |
{reply, ok, State1}; | |
handle_call({add_handler, #{name := Name, sink := NewSink} = Notify}, _From, #state{extra_notifies = Extra} = State) -> | |
OldNotify = case maps:get(Name, Extra, #{}) of | |
#{sink := NewSink} = ON -> maps:with([pid,ref],ON); | |
#{sink := _} -> shutdown_old(Name), #{}; | |
_ -> #{} | |
end, | |
Notify1 = maps:merge(Notify, OldNotify), | |
{ok, State1} = refresh(State#state{extra_notifies = maps:put(Name, Notify1, Extra)}), | |
{reply, Name, State1}; | |
handle_call({install_notifies, Notifies}, _From, #state{notifies = OldNotifies} = State) -> | |
Notifies2 = maps:map(fun(Name, #{sink := NewSink} = N) -> | |
OldNotify = case maps:get(Name, OldNotifies, #{}) of | |
#{sink := NewSink} = ON -> maps:with([pid,ref],ON); | |
#{sink := _} -> shutdown_old(Name), #{}; | |
_ -> #{} | |
end, | |
maps:merge(N, OldNotify) | |
end, Notifies), | |
{ok, State1} = refresh(State#state{notifies = Notifies2}), | |
{reply, ok, State1}; | |
handle_call(handlers, _From, #state{extra_notifies = Extra, notifies = Notifies} = State) -> | |
{reply, {Notifies, Extra}, State}; | |
handle_call(Call, _From, State) -> | |
{reply, {bad_call, Call}, State}. | |
terminate(_,_) -> ok. | |
refresh(#state{notifies = Notifies, extra_notifies = Extra} = State) -> | |
Existing = [Name || {Name,_,_,_} <- supervisor:which_children(events_manager_sup)], | |
ToRemove = [Name || Name <- Existing, not maps:is_key(Name, Notifies) andalso not maps:is_key(Name, Extra) ], | |
[shutdown_old(Name) || Name <- ToRemove], | |
Notifies2 = maps:map(fun | |
(_Name, #{pid := Pid} = N) when is_pid(Pid) -> N; | |
(_Name, #{} = N) -> launch_new(N) | |
end, Notifies), | |
Extra2 = maps:map(fun | |
(_Name, #{pid := Pid} = N) when is_pid(Pid) -> N; | |
(_Name, #{} = N) -> launch_new(N) | |
end, Extra), | |
Installed = | |
install_notifies(maps:values(Notifies2)) ++ | |
install_notifies(maps:values(Extra2)), | |
[case lists:member(Entry, Installed) of | |
true -> ok; | |
false -> ets:delete_object(events_routes, Entry) | |
end || Entry <- ets:tab2list(events_routes)], | |
{ok, State#state{notifies = Notifies2, extra_notifies = Extra2}}. | |
shutdown_old(Name) -> | |
supervisor:terminate_child(events_manager_sup, Name), | |
supervisor:delete_child(events_manager_sup, Name), | |
[ets:delete_object(events_routes, Route) || #route{name = N} = Route <- ets:tab2list(events_routes), N == Name], | |
ets:delete(events_handlers, Name), | |
ok. | |
launch_new(#{name := Name, sink := Sink} = Notify) -> | |
Proto = events_util:sink_protocol(Sink), | |
Mod = case Proto of | |
http -> events_sink_http; | |
erl -> events_sink_erl; | |
pid -> events_sink_pid; | |
lua -> events_sink_lua; | |
log -> events_sink_log; | |
console -> events_sink_console; | |
undefined -> undefined; | |
{plugin, Plugin} -> list_to_atom("events_sink_"++atom_to_list(Plugin)) | |
end, | |
case Mod of | |
undefined -> | |
Notify; | |
_ -> | |
{ok, Pid} = supervisor:start_child(events_manager_sup, | |
#{id => Name, start => {events_sink, start_link, [Mod, Notify]}, restart => transient, shutdown => 500}), | |
Ref = erlang:monitor(process, Pid), | |
Notify#{pid => Pid, ref => Ref} | |
end. | |
events() -> | |
[Name || #event{name = Name} <- known_events:all()]. | |
install_notifies([#{name := Name} = Notify|Notifies]) -> | |
Only = maps:get(only, Notify, []), | |
Except = maps:get(except, Notify, []), | |
Suitable = suitable_events(Notify), | |
Routes = [ #route{ | |
event = Evt, | |
name = Name, | |
verbose = maps:get(verbose, Notify, undefined), | |
only = [atomize(maps:to_list(O)) || O <- Only], | |
except = [atomize(maps:to_list(E)) || E <- Except], | |
pid = maps:get(pid, Notify, undefined) | |
} || Evt <- Suitable ], | |
ets:insert(events_routes, Routes), | |
case maps:get(pid, Notify, undefined) of | |
undefined -> ok; | |
Pid -> Pid ! {update_options, Notify} | |
end, | |
Routes ++ install_notifies(Notifies); | |
install_notifies([]) -> | |
[]. | |
atomize([{application,Evt}|List]) when is_binary(Evt) -> [{application,binary_to_atom(Evt,latin1)}|List]; | |
atomize([{module,Evt}|List]) when is_binary(Evt) -> [{module,binary_to_atom(Evt,latin1)}|List]; | |
atomize([{event,Evt}|List]) when is_binary(Evt) -> [{event,binary_to_atom(Evt,latin1)}|List]; | |
atomize([{event,Evts}|List]) when is_list(Evts) -> [{event,[binary_to_atom(Evt,latin1) || Evt <- Evts]}|List]; | |
atomize([{K,V}|List]) -> [{K,V}|atomize(List)]; | |
atomize([]) -> []. | |
suitable_events(#{} = Notify) -> | |
All0 = lists:usort(events()), | |
All = case maps:get(verbose,Notify,undefined) of | |
undefined -> All0 -- [crashed,text]; | |
_ -> All0 | |
end, | |
Whitelist = select_whitelisted_events(maps:get(only, Notify, []), All), | |
Blacklist = select_blacklisted_events(maps:get(except, Notify, []), All), | |
case Whitelist of | |
[] -> | |
All -- Blacklist; | |
_ -> | |
Whitelist | |
end. | |
select_whitelisted_events(Notifies, All) -> | |
merge_events(Notifies, [], All, white). | |
select_blacklisted_events(Notifies, All) -> | |
merge_events(Notifies, [], All, black). | |
merge_events([], Acc, _, _) -> | |
lists:usort(Acc); | |
merge_events([#{event := Evts}|List], Acc, All, WhiteBlack) when is_list(Evts) -> | |
merge_events(List, [binary_to_atom(Evt,latin1) || Evt <- Evts] ++ Acc, All, WhiteBlack); | |
merge_events([#{event := Evt}|List], Acc, All, WhiteBlack) when is_binary(Evt) -> | |
merge_events(List, [to_a(Evt)|Acc], All, WhiteBlack); | |
merge_events([#{}|_], _, All, white) -> | |
All; | |
merge_events([#{}|_], _, _All, black) -> | |
[]. | |
to_a(Atom) when is_atom(Atom) -> Atom; | |
to_a(Bin) when is_binary(Bin) -> binary_to_atom(Bin, latin1). | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment