Skip to content

Instantly share code, notes, and snippets.

@linjunpop
Created December 25, 2018 15:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linjunpop/3cf6275c8823310f7c33539753d9500a to your computer and use it in GitHub Desktop.
Save linjunpop/3cf6275c8823310f7c33539753d9500a to your computer and use it in GitHub Desktop.
Building A continues data processing system
defmodule Archiver.Fetcher do
use GenStage
def start_link(args) do
GenStage.start_link(__MODULE__, args, name: __MODULE__)
end
def init(state), do: {:producer, state}
def handle_demand(demand, state) do
items = Database.get_items()
{:noreply, items, state}
end
end
defmodule Archiver.Uploader do
use GenStage
def start_link(args) do
GenStage.start_link(__MODULE__, args, name: __MODULE__)
end
def init(_state) do
{:consumer, :the_state_does_not_matter, subscribe_to: [Archiver.Fetcher]}
end
def handle_events(items, _from, _state) do
items
|> Enum.map(&Task.start_link(Dropbox, :upload, [&1]))
{:noreply, [], :the_state_does_not_matter}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment