Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example usage of Task.async_stream/3
# Take all numbers from 0 to 50 million and sum every number from 0 to that number and sum those numbers
:timer.tc fn ->
1..50_000_000
|> Enum.map(& Enum.sum(0..&1))
|> Enum.sum()
end
# Takes almost 2 minutes to run and returns the value 20,833,345,833,335,000,000
# => {108521694, 20833334583333350000000}
# Using task async stream, this can be done concurrently easily by just
# replacing `Enum.map` with `Task.async_stream`, but theres a problem.
# Doing so takes almost 13 minutes, 7 times longer than the sequential version!
# This is because the overhead of concurrency outweighs the actual processing of
# a single element.
:timer.tc fn ->
1..50_000_000
|> Task.async_stream(fn i ->
Enum.sum(0..i)
end, max_concurrency: System.schedulers_online() * 2)
|> Stream.map(&elem(&1, 1))
|> Enum.sum()
end
# Takes almost 13 minutes to run! Returns same value of 20,833,345,833,335,000,000
# => {88103825, 20833345833335000000}
# This can be remedied by doing batches. Using `Enum.chunk_every` to break up the input into a list of
# lists so that there is more work to be done per concurrent execution, negating the overhead.
:timer.tc fn ->
1..50_000_000
|> Enum.chunk_every(312500) # chunk it so theres equal rounds
|> Task.async_stream(fn i ->
Enum.map(i, & Enum.sum(0..&1))
|> Enum.sum()
end, max_concurrency: System.schedulers_online() * 2)
|> Stream.map(&elem(&1, 1))
|> Enum.sum()
end
# (my machine has 4 `System.schedulers_online()`, 2 physical 2 logical cores)
# 57 seconds! Returns same value of 20,833,345,833,335,000,000
# => {57242940, 20833334583333350000000}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.