Created
August 24, 2018 18:51
-
-
Save fmcgeough/c8e2f459ac3aa034bac1ed2b6abc6402 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule Throttle do | |
| alias __MODULE__ | |
| defstruct ms_interval: 1000, | |
| initial_func: nil, | |
| handle_func: nil, | |
| final_func: nil, | |
| batch_size: 5, | |
| data: [] | |
| def new(data, initial_fn, handle_fn, final_fn, ms_interval \\ 1000, batch_size \\ 5) | |
| when is_list(data) and handle_fn != nil do | |
| %Throttle{ | |
| data: data, | |
| initial_func: initial_fn, | |
| handle_func: handle_fn, | |
| final_func: final_fn, | |
| ms_interval: ms_interval, | |
| batch_size: batch_size | |
| } | |
| end | |
| end | |
| defmodule ApiThrottle do | |
| @moduledoc """ | |
| Provide a module to allow throttling API calls by doing callbacks to | |
| caller on a scheduled basis. | |
| """ | |
| use GenServer | |
| require Logger | |
| def start(throttle = %Throttle{}, key) do | |
| Logger.info( | |
| "Starting an #{__MODULE__} for #{inspect(key)} - #{Enum.count(throttle.data)} items." | |
| ) | |
| GenServer.start_link(__MODULE__, {throttle, 0, key}, name: key) | |
| end | |
| @doc """ | |
| On startup have a callback to init caller who wants to throttle API | |
| """ | |
| def init(state) do | |
| schedule_action(:init, 1) | |
| {:ok, state} | |
| end | |
| @doc """ | |
| After init start processing work. | |
| On first call we'll process_api_data immediately (1ms) but | |
| thereafter we use caller's specified throttle.ms_interval | |
| """ | |
| def handle_info(:init, {throttle, _offset, _key} = state) do | |
| if throttle.initial_func do | |
| throttle.initial_func.(throttle) | |
| end | |
| schedule_action(:work, 1) | |
| {:noreply, state} | |
| end | |
| @doc """ | |
| process_api_data and update GenServer state so we have less data | |
| If we hit final data then we schedule a finalize call. | |
| """ | |
| def handle_info(:work, {throttle, offset, key} = throttle_info) do | |
| Logger.debug(fn -> | |
| "#{__MODULE__} #{key} :work, time = #{DateTime.utc_now()}, #{offset} out of #{ | |
| Enum.count(throttle.data) | |
| }." | |
| end) | |
| process_api_data(throttle_info) | |
| {:noreply, {throttle, offset + throttle.batch_size, key}} | |
| end | |
| @doc """ | |
| We've processed all caller data so call their finalize function | |
| and stop. | |
| """ | |
| def handle_info(:finalize, {throttle, _offset, key} = state) do | |
| Logger.info("Completed an #{__MODULE__} #{key} for #{Enum.count(throttle.data)} items.") | |
| if throttle.final_func do | |
| throttle.final_func.(throttle) | |
| end | |
| {:stop, :normal, state} | |
| end | |
| defp done_with_data?(data, offset) do | |
| Enum.at(data, offset) == nil | |
| end | |
| defp process_api_data({throttle, offset, _key}) do | |
| case done_with_data?(throttle.data, offset) do | |
| true -> | |
| schedule_action(:finalize, 1) | |
| false -> | |
| Enum.slice(throttle.data, offset, throttle.batch_size) | |
| |> Enum.each(fn item -> throttle.handle_func.(item) end) | |
| schedule_action(:work, throttle.ms_interval) | |
| end | |
| end | |
| defp schedule_action(action, ms) do | |
| Process.send_after(self(), action, ms) | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment