File.read!("source")
|> String.split("\n")
|> Enum.flat_map(&String.split/1)
|> Enum.reduce(%{}, fn word, map ->
Map.update(map, word, 1, & &1 + 1)
end)
=> %{ "are" => 2, "blue" => 1, ... }
- Simple
- Efficient for small collections
- Inefficient for large collections
File.stream!("source", :line)
|> Stream.flat_map(&String.split/1)
|> Enum.reduce(%{}, fn word, map ->
Map.update(map, word, 1, & &1 + 1)
end)
- Folds computations, goes item by item
- Less memory usage at the cost of computation
- Allow us to work with large or infinite collections
File.stream!("source", :line)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.reduce(fn -> %{} end, fn word, map -> Map.update(map, word, 1, & &1 + 1) end)
|> Enum.into(%{})
- Give up ordering and process locality for concurrency
- Ordering unimportant: what if it is important?
- Tools for working with bounded and unbounded data
- It is not magic! There is an overhead when data flows through processes
- Requires volume and/or cpu/io-bound work to see benefits
- ~1200 LOC
- ~1300 LOD (L.o.Documentation)
- It is a new behaviour
- Exchange data between stages transparently with back-pressure
- Breaks into producers, consumers and producer_consumers
producer -> producer-consumer -> producer-consumer -> producer-consumer -> consumer
- consumer subscribes to producer
- consumer sends demand ("Asks 10")
- producer sends events ("Sends max 10")
- It is a message contract
- It pushes back-pressure to the boundary
- Pre producer
- Effectively receive the demand and send data
- Allow a producer to dispatch to multiple consumers at once
- Support generic producers
- Replace GenEvent
- Introduce DynamicSupervisor
File.stream!("source", :line)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split/1)
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, map -> Map.update(map, word, 1, & &1 + 1) end)
|> Enum.into(%{})
-
Flow.from_enumerable()
creates a GenStage withDemandDispatcher
and multiple producer-consumers -
Flow.flat_map(&String.split/1)
defines what each producer-consumer does -
Flow.partition()
causes each producer-consumer to partition output to next set of consumers -
Flow.reduce(fn -> %{} end, fn word, map -> Map.update(map, word, 1, & &1 + 1) end)
-
reduce/3
collects all data into maps -
when it is done the maps are streamed
-
into/2
collects the state into a map -
breaks down when the input is infinite:
reduce/3
requires the data to end
- If reduce/3 runs until all data is processed... what happens on unbounded data?
- See Flow.Window documentation...
- Use
@behaviour GenStage
to make it an optional dep