Skip to content

Instantly share code, notes, and snippets.

@pinkerltm
Last active March 2, 2020 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pinkerltm/8c8458830b8d2047a8d796fb3cfefc28 to your computer and use it in GitHub Desktop.
Save pinkerltm/8c8458830b8d2047a8d796fb3cfefc28 to your computer and use it in GitHub Desktop.
brod group subscriber supervisor
-module(group_subscriber_sup).
-author("jzornig").
-behaviour(supervisor).
-define(SERVER, ?MODULE).
-define(BROKER, "localhost").
-define(BROKERPORT,9092).
-define(TOPIC,<<"test">>).
-define(CHILD_SPECS,
[
%% === Put your Child Specs here ===
#{
id => odp_brod_client_console,
modules => [?MODULE],
restart => permanent,
shutdown => 1000,
start => {brod_client, start_link, [[
{?BROKER, ?BROKERPORT}
], odp_brod_client_console, []]}
}
,#{
id => odp_brod_client_crate,
modules => [?MODULE],
restart => permanent,
shutdown => 1000,
start => {brod_client, start_link, [[
{?BROKER, ?BROKERPORT}
], odp_brod_client_crate, []]}
}
,#{
id => odp_brod_client_state,
modules => [?MODULE],
restart => permanent,
shutdown => 1000,
start => {brod_client, start_link, [[
{?BROKER, ?BROKERPORT}
], odp_brod_client_state, []]}
}
,#{
id => odp_cg_state,
modules => [odp_kafka_state],
restart => permanent,
shutdown => 1000,
type => worker,
start => {brod, start_link_group_subscriber_v2, [#{
client => odp_brod_client_state,
group_id => <<TOPIC/binary, "-to-state">>,
topics => [TOPIC],
cb_module => odp_kafka_state,
init_data => #{}, % -> init Config
message_type => message,
consumer_config => [
{begin_offset, latest},
{offset_reset_policy, reset_to_earliest},
{max_wait_time, 5000},
{sleep_timeout, 3000}
],
group_config => [
{offset_commit_policy, commit_to_kafka_v2},
{offset_commit_interval_seconds, 5}
]
}]}
}
,#{
id => odp_cg_console,
modules => [odp_kafka_console],
restart => permanent,
shutdown => 1000,
type => worker,
start => {brod, start_link_group_subscriber_v2, [#{
client => odp_brod_client_console,
group_id => <<TOPIC/binary, "-to-console">>,
topics => [TOPIC],
cb_module => odp_kafka_console,
init_data => #{}, % -> init Config
message_type => message,
consumer_config => [
{begin_offset, latest},
{offset_reset_policy, reset_to_earliest},
{max_wait_time, 5000},
{sleep_timeout, 3000}
],
group_config => [
{offset_commit_policy, commit_to_kafka_v2},
{offset_commit_interval_seconds, 5}
]
}]}
}
,#{
id => odp_cg_crate,
modules => [odp_kafka_crate],
restart => permanent,
shutdown => 1000,
type => worker,
start => {brod, start_link_group_subscriber_v2, [#{
client => odp_brod_client_crate,
group_id => <<TOPIC/binary, "-to-crate">>,
topics => [TOPIC],
cb_module => odp_kafka_crate,
init_data => #{}, % -> init Config
message_type => message,
consumer_config => [
{begin_offset, latest},
{offset_reset_policy, reset_to_earliest},
{max_wait_time, 5000},
{sleep_timeout, 3000}
],
group_config => [
{offset_commit_policy, commit_to_kafka_v2},
{offset_commit_interval_seconds, 5}
]
}]}
}
]
).
%% API
-export([init/1, start_link/1, start_link/0, delete/1, restart/1, status/1, count/0, list/0, add/1, brod_restart/0]).
%%%===================================================================
%%% API functions
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%%
%% @end
%%--------------------------------------------------------------------
start_link() ->
start_link(#{}).
start_link(Args) ->
supervisor:start_link({local, ?SERVER}, ?MODULE, [Args]).
%default init during boot
init(_) ->
TOPIC = ?TOPIC,
SupFlags = #{
strategy => one_for_one,
intensity => 100,
period => 10
},
ChildSpecs = ?CHILD_SPECS,
{ok, {SupFlags, ChildSpecs}}.
add(#{id := ChildID} = ChildSpec) ->
case supervisor:get_childspec(?MODULE, ChildID) of
not_found ->
X = supervisor:start_child(?MODULE, ChildSpec),
X;
OldSpec ->
supervisor:terminate_child(?MODULE, ChildID),
supervisor:delete_child(?MODULE, ChildID),
X = supervisor:start_child(?MODULE, ChildSpec),
X
end.
delete(ChildID) ->
supervisor:terminate_child(?MODULE, ChildID),
supervisor:delete_child(?MODULE, ChildID).
restart(ChildID) ->
supervisor:terminate_child(?MODULE, ChildID),
supervisor:restart_child(?MODULE, ChildID).
status(ChildID) ->
supervisor:get_childspec(?MODULE, ChildID).
count() ->
supervisor:count_children(?MODULE).
list() ->
supervisor:which_children(?MODULE).
%little helper to push and test quickly
brod_restart() ->
TOPIC =?TOPIC,
ChildSpecs = ?CHILD_SPECS,
Add = fun(Spec) -> case ?MODULE:add(Spec) of {ok, _} -> true; {error, _} -> false end end,
delete(odp_evn_brod_client),
delete(odp_cg_state),
delete(odp_cg_evn_csv_crate),
case lists:all(Add, ChildSpecs) of
true -> {ok, []};
false -> {error, []}
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment