Skip to content

Instantly share code, notes, and snippets.

@zvkemp
Created March 9, 2016 17:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zvkemp/3cff279388b89745313e to your computer and use it in GitHub Desktop.
Save zvkemp/3cff279388b89745313e to your computer and use it in GitHub Desktop.
Annotated multi-process mapping module in Elixir
defmodule Parallel do
def map(enum, function, workers: n) do
worker_pool(n) # set up a worker pool
# lazily zip it together with the enum elements (result: [ { pid0, e0 }, { pid1, e1 } ... ])
|> Stream.zip(enum)
# send each piece of data to the worker, along with the mapping function
|> Enum.each(fn { pid, e } ->
send(pid, { e, function })
end)
# slurp data out of incoming message buffer into a list
# Receive data the same number of times as elements in the list
Enum.reduce(enum, [], &receive_to_list/2)
end
# first argument is ignored, because we are just using `Enum.reduce` to collect remote data.
# NOTE: this is not guaranteed to receive data in the same order
# it was pushed out to remotes; rather it is in first-received order.
defp receive_to_list(_, acc) do
acc ++ [receive do d -> d end]
end
defp worker_pool(count) do
Enum.map(1..count, fn (_) -> spawn_worker end) |> Stream.cycle
end
defp spawn_worker do
# `self` is a keyword referring to the current process
# binding it to a variable here ensures that the correct caller process
# is found when the function is called on a remote worker (otherwise, `self` would
# refer to the remote)
caller = self
spawn_link(fn -> remote_loop(caller) end)
end
defp remote_loop(caller) do
# receive is a blocking loop; it will wait until one of the following
# patterns is matched by a message from another process.
# If an unmatched message is received the process will error and exit.
receive do
:stop -> nil # nothing happens, and the recursion stops
{ data, function } ->
send caller, function.(data)
remote_loop(caller) # recursively call this function to wait for another message
end
end
end
pause = fn (x) ->
:timer.sleep(1) # milliseconds
x
end
benchmark = fn (workers) ->
IO.puts("#{workers} workers:")
{ microseconds, result } = :timer.tc(fn ->
Parallel.map(1..2500, pause, workers: workers)
end)
IO.puts "in #{microseconds / 1_000_000.0}s"
end
benchmark.(1) #=> in 5.002589s
benchmark.(2) #=> in 2.500837s
benchmark.(5) #=> in 1.000895s
benchmark.(100) #=> in 0.051999s
benchmark.(250) #=> in 0.034882s
benchmark.(500) #=> in 0.038719s
benchmark.(2500) #=> in 0.05327s
# Mapping over 2500 elements at 1ms each, the time taken generally
# shrinks linearly with the number of workers (up to a point - the latency
# of inter-process communication is very small, but not nothing).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment