Skip to content

Instantly share code, notes, and snippets.

@sanmiguel
Last active September 2, 2016 15:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sanmiguel/d73056adb6cffc9954327d8fcd89cd48 to your computer and use it in GitHub Desktop.
Save sanmiguel/d73056adb6cffc9954327d8fcd89cd48 to your computer and use it in GitHub Desktop.

Flow

Eager model:

File.read!("source")
|> String.split("\n")
|> Enum.flat_map(&String.split/1)
|> Enum.reduce(%{}, fn word, map ->
    Map.update(map, word, 1, & &1 + 1)
  end)

=> %{ "are" => 2, "blue" => 1, ... }
  • Simple
  • Efficient for small collections
  • Inefficient for large collections

Lazy model

File.stream!("source", :line)
|> Stream.flat_map(&String.split/1)
|> Enum.reduce(%{}, fn word, map ->
    Map.update(map, word, 1, & &1 + 1)
  end)
  • Folds computations, goes item by item
  • Less memory usage at the cost of computation
  • Allow us to work with large or infinite collections

Flow model

File.stream!("source", :line)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.reduce(fn -> %{} end, fn word, map -> Map.update(map, word, 1, & &1 + 1) end)
|> Enum.into(%{})
  • Give up ordering and process locality for concurrency
  • Ordering unimportant: what if it is important?
  • Tools for working with bounded and unbounded data
  • It is not magic! There is an overhead when data flows through processes
  • Requires volume and/or cpu/io-bound work to see benefits

Flow stats

  • ~1200 LOC
  • ~1300 LOD (L.o.Documentation)

GenStage

  • It is a new behaviour
  • Exchange data between stages transparently with back-pressure
  • Breaks into producers, consumers and producer_consumers
producer -> producer-consumer -> producer-consumer -> producer-consumer -> consumer

Demand-driven

  1. consumer subscribes to producer
  2. consumer sends demand ("Asks 10")
  3. producer sends events ("Sends max 10")

GenStage: Demand-driven

  • It is a message contract
  • It pushes back-pressure to the boundary

Example in announcement

GenStage dispatchers

  • Pre producer
  • Effectively receive the demand and send data
  • Allow a producer to dispatch to multiple consumers at once

GenStage goals

  • Support generic producers
  • Replace GenEvent
  • Introduce DynamicSupervisor

Flow

File.stream!("source", :line)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, map -> Map.update(map, word, 1, & &1 + 1) end)
|> Enum.into(%{})
  • Flow.from_enumerable() creates a GenStage with DemandDispatcher and multiple producer-consumers

  • Flow.flat_map(&String.split/1) defines what each producer-consumer does

  • Flow.partition() causes each producer-consumer to partition output to next set of consumers

  • Flow.reduce(fn -> %{} end, fn word, map -> Map.update(map, word, 1, & &1 + 1) end)

  • reduce/3 collects all data into maps

  • when it is done the maps are streamed

  • into/2 collects the state into a map

  • breaks down when the input is infinite: reduce/3 requires the data to end

Windows and Triggers

  • If reduce/3 runs until all data is processed... what happens on unbounded data?
  • See Flow.Window documentation...

Tips

  • Use @behaviour GenStage to make it an optional dep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment