Skip to content

Instantly share code, notes, and snippets.

@sferik
Created March 1, 2014 02:44
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sferik/9284252 to your computer and use it in GitHub Desktop.
Save sferik/9284252 to your computer and use it in GitHub Desktop.
def pmap(enum)
return to_enum(:pmap, enum) unless block_given?
enum.map { |e| Thread.new { yield e } }.map(&:value)
end
# Returns elements in order, as expected.
pmap(1..10) { |e| e } #=> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Returns elements in nondeterministic order on MRI >= 1.9.3.
# Works as expected on JRuby, Rubinius, and earlier versions of MRI.
pmap(1..10).to_a #=> [7, 2, 3, 4, 6, 5, 9, 8, 10, 1]
@cromwellryan
Copy link

line 4 maps the enumeration to threads returning immediately and independent of the completion of the thread's block. .value blocks on the completion of the thread long after it was created.

This shows that the actual evaluation of the Thread's block doesn't occur in order, but the results of the Thread.new initialization do occur fast enough to result in orderly Thread instances.

def pmap(enum)
  return to_enum(:pmap, enum) unless block_given?
  enum.map { |e| Thread.new { sleep(Random.rand); p e; yield e } }.map(&:value) 
end

pmap(1..10) { |e| e } 

1
2
5
6
8
3
7
9
4
10

@mkdynamic
Copy link

Maybe clearer, writing it this way?

def pmap(enum)
  if block_given?
    enum.map { |e| Thread.new { yield e } }.map(&:value)
  else
    Enumerator.new do |yielder|
      enum.map { |e| Thread.new { yielder.yield e } }.each(&:join)
    end
  end
end

@sferik
Copy link
Author

sferik commented Mar 1, 2014

@mkdynamic Why does the second map become each in the case when no block is passed?

@mkdynamic
Copy link

@sferik Ah, sorry ignore that, unintentional. It's the same with map.

The key part is that the the yield in the thread block is actually yielding the result directly to the yielder in the context of the Enumerator. So results are accumulated as soon as they are evaluated in the thread.

Providing I am not misunderstanding something, which is a distinct possibility :)

@sferik
Copy link
Author

sferik commented Mar 1, 2014

@mkdynamic @cromwellryan That makes sense. Thanks for explaining.

In case anyone else is curious, this is how I was able to get the correct ordering for pmap:

def pmap_with_index(enum)
  return to_enum(:pmap_with_index, enum) unless block_given?
  enum.map.with_index { |e, i| Thread.new { yield e, i } }.map(&:value)
end

def pmap(enum)
  return to_enum(:pmap, enum) unless block_given?
  pmap_with_index(enum).sort_by { |_, i| i }.map{ |e, _| yield e } 
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment