Skip to content

Instantly share code, notes, and snippets.

@pmarreck
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pmarreck/1bd35a32a47f58322842 to your computer and use it in GitHub Desktop.
Save pmarreck/1bd35a32a47f58322842 to your computer and use it in GitHub Desktop.
Elixir attempt at pooled concurrency demo
defmodule ConcurrencyDemo do
def num_procs(erlang \\ :erlang) do
erlang.system_info(:logical_processors)
end
def query_node(nodename, func, node \\ Node) do
me = self
pid = node.spawn(nodename, fn -> (send me, { self, func.() }) end)
receive do {^pid, result} -> result end
end
def machines(node \\ Node) do
[node.self | node.list]
end
def num_total_workers_by_machine(machines \\ machines, erlang \\ :erlang) do
machines |> Enum.map(fn nodename ->
{nodename, query_node(nodename, fn ->
erlang.system_info(:logical_processors)
end)}
end)
end
def num_total_workers(num_total_workers_by_machine \\ num_total_workers_by_machine) do
num_total_workers_by_machine |> Enum.reduce(0, fn({_, procs},tot) -> tot + procs end)
end
def worker_pool_by_nodenames(num_total_workers_by_machine \\ num_total_workers_by_machine) do
num_total_workers_by_machine |> Enum.flat_map(fn {w,procs} -> List.duplicate(w,procs) end)
end
def split_range_of_numbers(%Range{first: start, last: finish}, approx_chunk_size \\ 150) do
size = (finish - start)
if size < approx_chunk_size do
[start..finish]
else
halfway = trunc(start + (size/2))
List.flatten [split_range_of_numbers(start..halfway, approx_chunk_size), split_range_of_numbers((halfway+1)..finish, approx_chunk_size)]
end
end
# Allows mapping over a work collection using any nodes available
def pmap(collection, func, worker_pool_by_nodenames \\ worker_pool_by_nodenames, node \\ Node) do
# Get this process's PID
me = self
IO.puts "Collection length is: #{length(collection)}"
collection
# First, assign a machine to receive each sub-workload
|> Enum.zip(Stream.cycle(worker_pool_by_nodenames))
|> Enum.map(fn {elem, machine_name} ->
# For each element in the collection, spawn a process on machine_name and
# tell it to:
# - Run the given function on that element
# - Call up the parent process
# - Send the parent its PID and its result
# Each call to spawn_link returns the child PID immediately.
IO.puts "I am spawning a worker on #{machine_name} for range: #{inspect elem}"
node.spawn_link machine_name, fn -> (send me, { self, func.(elem) }) end
end)
# Here we have the complete list of child PIDs. We don't yet know
# which, if any, have completed their work
|> Enum.map(fn (pid) ->
IO.puts "I am waiting on results from pid #{inspect pid}"
# For each child PID, in order, block until we receive an
# answer from that PID and return the answer
# While we're waiting on something from the first pid, we may
# get results from others, but we won't "get those out of the
# mailbox" until we finish with the first one.
receive do { ^pid, result } -> (IO.puts "I got result #{result} from pid #{inspect pid}"; result) end
end)
end
# bodyless signature with defaults
def concurrent_factorial(range, worker_pool_by_nodenames \\ worker_pool_by_nodenames, node \\ Node, chunk_size \\ nil)
def concurrent_factorial(range = %Range{first: start, last: finish}, worker_pool_by_nodenames, node, chunk_size) do
unless chunk_size do
# split up the work into roughly equal ranges times the number of total cores available
chunk_size = (finish - start)/length(worker_pool_by_nodenames)
end
IO.puts "Worker pool length is: #{length(worker_pool_by_nodenames)}."
IO.puts "I have a range from #{start} to #{finish}. chunk_size is #{chunk_size}."
work_chunks = split_range_of_numbers(range, chunk_size)
IO.puts "Number of work chunks are: #{length(work_chunks)}. Pmapping"
pmap(work_chunks,
fn(subrange) -> subrange |> Enum.reduce(&(&1*&2)) end,
worker_pool_by_nodenames,
node
)
# one more reduction... WHICH, WHOOPS, TAKES THE LONGEST TIME OF ALL on big ranges. ::slaps forehead::
|> Enum.reduce(&(&1*&2))
end
def concurrent_factorial(n, worker_pool_by_nodenames, node, chunk_size) when is_integer(n) and n > 0 do
concurrent_factorial(1..n, worker_pool_by_nodenames, node, chunk_size)
end
end
# run this inline suite with "elixir #{__ENV__.file} test"
if System.argv |> List.first == "test" do
ExUnit.start
defmodule ConcurrencyDemoTest do
use ExUnit.Case, async: true
alias ConcurrencyDemo, as: T
@tag timeout: 90000
defmodule ErlangSystemInfoStub do
def system_info(:logical_processors), do: 64 # one day...
end
defmodule NodeStub do
# simulates self-named node
def self, do: :"zombie@nation"
# simulates remote spawn, just does it locally
def spawn(_, func) do
# IO.puts "I am spawning"
Kernel.spawn(func)
end
def spawn_link(_, func) do
# IO.puts "I am spawn_linking"
Kernel.spawn_link(func)
end
def list, do: []
end
@num_cores_on_this_machine :erlang.system_info(:logical_processors)
defp stubbed_worker_pool_by_nodenames do
List.duplicate(:"zombie@nation", @num_cores_on_this_machine)
end
test "num_procs" do
assert T.num_procs > 0
end
test "query node" do
assert T.query_node(NodeStub.self, fn -> :erlang.system_info(:logical_processors) end, NodeStub) == @num_cores_on_this_machine
end
test "machines length at least 1" do
assert length(T.machines(NodeStub)) == 1
end
test "num_total_workers_by_machine" do
assert T.num_total_workers_by_machine([Node.self], ErlangSystemInfoStub) == [{Node.self, 64}]
end
test "num_total_workers" do
assert T.num_total_workers([{:"zombie@nation", 64},{:"this@that", 64} ]) == 128
end
test "worker_pool_by_nodenames" do
assert T.worker_pool_by_nodenames([{:"a@b",2},{:"c@d",2}]) == [:"a@b",:"a@b",:"c@d",:"c@d"]
end
test "split_range_of_numbers" do
assert T.split_range_of_numbers(1..100, 50) == [1..50,51..100]
assert T.split_range_of_numbers(1..50, 50) == [1..50]
end
test "pmap" do
assert T.pmap([1,2,3], &(&1), [:"zombie@nation"], NodeStub) == [1,2,3]
end
test "concurrent_factorial_n" do
assert T.concurrent_factorial(10) == 3628800
assert T.concurrent_factorial(10, stubbed_worker_pool_by_nodenames, NodeStub, 3) == 3628800
# The following spends all its time in the recombination of results step. LOL
assert T.concurrent_factorial(1000000, stubbed_worker_pool_by_nodenames, NodeStub, 200000) == 'whatevs, just testing core flooding'
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment