Skip to content

Instantly share code, notes, and snippets.

@sb8244
Created May 7, 2018 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sb8244/38cca1f7f1ffd50845a724c2d3c2fbbe to your computer and use it in GitHub Desktop.
Save sb8244/38cca1f7f1ffd50845a724c2d3c2fbbe to your computer and use it in GitHub Desktop.
Anonymized producer/consumer for triggering events from a controller
defmodule FeedItemProducer do
use GenStage
def start_link(:nameless) do
GenStage.start_link(__MODULE__, :ok)
end
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:producer, [], [buffer_size: 50_000]}
end
def notify(item = %FeedItem{}, pid \\ __MODULE__) do
GenStage.cast(pid, {:notify, item})
end
def handle_cast({:notify, item}, state) do
{:noreply, [%{item: item, at: Utils.Time.unix_now()}], state}
end
def handle_demand(demand, keys) when demand > 0 do
{:noreply, [], keys}
end
end
defmodule FeedItemConsumer do
use ConsumerSupervisor
def start_link() do
ConsumerSupervisor.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
children = [
worker(FeedItemServer, [], restart: :transient)
]
# max concurrency is 3
opts = [strategy: :one_for_one, subscribe_to: [{FeedItemProducer, max_demand: 3, min_demand: 1}]]
ConsumerSupervisor.init(children, opts)
end
end
defmodule FeedItemServer do
def start_link(%{item: event_feed_item, at: unix_inserted_at}) do
Task.start_link(fn ->
FeedItem.notify(event_feed_item)
end)
end
end
# start the consumer & producer in your Application supervisor, as per the GenStage docs
# You can call FeedItemProducer.notify(item) at any time, from a controller or elsewhere
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment