Skip to content

Instantly share code, notes, and snippets.

@emilsoman
Last active March 1, 2017 14:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save emilsoman/bb83608f5d0ad456657cf024f859f2fe to your computer and use it in GitHub Desktop.
Save emilsoman/bb83608f5d0ad456657cf024f859f2fe to your computer and use it in GitHub Desktop.
Inactivity based trigger for Flow
defmodule MyFlow.Producer do
use GenStage
def start_link do
GenStage.start_link(__MODULE__, nil, name: __MODULE__)
end
def push(user_id) do
GenStage.call(__MODULE__, {:push, user_id})
end
def init(nil) do
{:producer, {{[], 0}, nil}}
end
def handle_call({:push, user_id}, _from, {{existing_user_ids, demand}, timer}) do
if timer do
Process.cancel_timer(timer)
end
user_ids = [user_id | existing_user_ids]
{dispatch, state} = dispatch({user_ids, demand})
timer = Process.send_after(self(), :trigger, 3000)
{:reply, :ok, dispatch, {state, timer}}
end
def handle_info(:trigger, state) do
GenStage.async_notify(self(), {:producer, :done})
{:noreply, [], state}
end
def handle_demand(demand, {{user_ids, existing_demand}, timer}) when demand > 0 do
{dispatch, state} = dispatch({user_ids, demand + existing_demand})
{:noreply, dispatch, {state, timer}}
end
defp dispatch({[], demand} = state) do
{[], state}
end
defp dispatch({user_ids, demand}) do
{dispatch, rest} = Enum.split(user_ids, demand)
new_state = {rest, demand - length(dispatch)}
{dispatch, new_state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment