Skip to content

Instantly share code, notes, and snippets.

@fmcgeough
Created August 24, 2018 18:51
Show Gist options
  • Select an option

  • Save fmcgeough/c8e2f459ac3aa034bac1ed2b6abc6402 to your computer and use it in GitHub Desktop.

Select an option

Save fmcgeough/c8e2f459ac3aa034bac1ed2b6abc6402 to your computer and use it in GitHub Desktop.
defmodule Throttle do
alias __MODULE__
defstruct ms_interval: 1000,
initial_func: nil,
handle_func: nil,
final_func: nil,
batch_size: 5,
data: []
def new(data, initial_fn, handle_fn, final_fn, ms_interval \\ 1000, batch_size \\ 5)
when is_list(data) and handle_fn != nil do
%Throttle{
data: data,
initial_func: initial_fn,
handle_func: handle_fn,
final_func: final_fn,
ms_interval: ms_interval,
batch_size: batch_size
}
end
end
defmodule ApiThrottle do
@moduledoc """
Provide a module to allow throttling API calls by doing callbacks to
caller on a scheduled basis.
"""
use GenServer
require Logger
def start(throttle = %Throttle{}, key) do
Logger.info(
"Starting an #{__MODULE__} for #{inspect(key)} - #{Enum.count(throttle.data)} items."
)
GenServer.start_link(__MODULE__, {throttle, 0, key}, name: key)
end
@doc """
On startup have a callback to init caller who wants to throttle API
"""
def init(state) do
schedule_action(:init, 1)
{:ok, state}
end
@doc """
After init start processing work.
On first call we'll process_api_data immediately (1ms) but
thereafter we use caller's specified throttle.ms_interval
"""
def handle_info(:init, {throttle, _offset, _key} = state) do
if throttle.initial_func do
throttle.initial_func.(throttle)
end
schedule_action(:work, 1)
{:noreply, state}
end
@doc """
process_api_data and update GenServer state so we have less data
If we hit final data then we schedule a finalize call.
"""
def handle_info(:work, {throttle, offset, key} = throttle_info) do
Logger.debug(fn ->
"#{__MODULE__} #{key} :work, time = #{DateTime.utc_now()}, #{offset} out of #{
Enum.count(throttle.data)
}."
end)
process_api_data(throttle_info)
{:noreply, {throttle, offset + throttle.batch_size, key}}
end
@doc """
We've processed all caller data so call their finalize function
and stop.
"""
def handle_info(:finalize, {throttle, _offset, key} = state) do
Logger.info("Completed an #{__MODULE__} #{key} for #{Enum.count(throttle.data)} items.")
if throttle.final_func do
throttle.final_func.(throttle)
end
{:stop, :normal, state}
end
defp done_with_data?(data, offset) do
Enum.at(data, offset) == nil
end
defp process_api_data({throttle, offset, _key}) do
case done_with_data?(throttle.data, offset) do
true ->
schedule_action(:finalize, 1)
false ->
Enum.slice(throttle.data, offset, throttle.batch_size)
|> Enum.each(fn item -> throttle.handle_func.(item) end)
schedule_action(:work, throttle.ms_interval)
end
end
defp schedule_action(action, ms) do
Process.send_after(self(), action, ms)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment