Skip to content

Instantly share code, notes, and snippets.

@sferik sferik/pmap.rb
Created Mar 1, 2014

Embed
What would you like to do?
def pmap(enum)
return to_enum(:pmap, enum) unless block_given?
enum.map { |e| Thread.new { yield e } }.map(&:value)
end
# Returns elements in order, as expected.
pmap(1..10) { |e| e } #=> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Returns elements in nondeterministic order on MRI >= 1.9.3.
# Works as expected on JRuby, Rubinius, and earlier versions of MRI.
pmap(1..10).to_a #=> [7, 2, 3, 4, 6, 5, 9, 8, 10, 1]
@cromwellryan

This comment has been minimized.

Copy link

cromwellryan commented Mar 1, 2014

line 4 maps the enumeration to threads returning immediately and independent of the completion of the thread's block. .value blocks on the completion of the thread long after it was created.

This shows that the actual evaluation of the Thread's block doesn't occur in order, but the results of the Thread.new initialization do occur fast enough to result in orderly Thread instances.

def pmap(enum)
  return to_enum(:pmap, enum) unless block_given?
  enum.map { |e| Thread.new { sleep(Random.rand); p e; yield e } }.map(&:value) 
end

pmap(1..10) { |e| e } 

1
2
5
6
8
3
7
9
4
10
@mkdynamic

This comment has been minimized.

Copy link

mkdynamic commented Mar 1, 2014

Maybe clearer, writing it this way?

def pmap(enum)
  if block_given?
    enum.map { |e| Thread.new { yield e } }.map(&:value)
  else
    Enumerator.new do |yielder|
      enum.map { |e| Thread.new { yielder.yield e } }.each(&:join)
    end
  end
end
@sferik

This comment has been minimized.

Copy link
Owner Author

sferik commented Mar 1, 2014

@mkdynamic Why does the second map become each in the case when no block is passed?

@mkdynamic

This comment has been minimized.

Copy link

mkdynamic commented Mar 1, 2014

@sferik Ah, sorry ignore that, unintentional. It's the same with map.

The key part is that the the yield in the thread block is actually yielding the result directly to the yielder in the context of the Enumerator. So results are accumulated as soon as they are evaluated in the thread.

Providing I am not misunderstanding something, which is a distinct possibility :)

@sferik

This comment has been minimized.

Copy link
Owner Author

sferik commented Mar 1, 2014

@mkdynamic @cromwellryan That makes sense. Thanks for explaining.

In case anyone else is curious, this is how I was able to get the correct ordering for pmap:

def pmap_with_index(enum)
  return to_enum(:pmap_with_index, enum) unless block_given?
  enum.map.with_index { |e, i| Thread.new { yield e, i } }.map(&:value)
end

def pmap(enum)
  return to_enum(:pmap, enum) unless block_given?
  pmap_with_index(enum).sort_by { |_, i| i }.map{ |e, _| yield e } 
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.