Skip to content

Instantly share code, notes, and snippets.

@djellemah
Last active March 19, 2016 11:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djellemah/cbc1d350547f043c1b70 to your computer and use it in GitHub Desktop.
Save djellemah/cbc1d350547f043c1b70 to your computer and use it in GitHub Desktop.
Enumerable#pmap and Enumerable#pfind using SizedQueue and a simple future
require_relative 'waitron' # Waitron is a simple future
module Enumerable
# Return the first item for which bloc returns truthy.
# Obviously there's indeterminacy if there are several such items in the collection.
# This works best when bloc is waiting for something. Because GVL.
# Obviously bloc should be careful about mutable shared state
def pfind( threads: 4, &bloc )
q = SizedQueue.new threads
found_item = Waitron.new # Waitron is a simple future
# worker threads
ts = threads.times.map do
Thread.new do
while item = q.pop
begin
found_item._ = item if bloc.call item
rescue Exception
# yes, we really want to catch all catchable exceptions otherwise
# they get swallowed by the thread and become invisible to caller
# of pfind.
found_item._ = $!
end
end
end
end
Thread.new do
# this will raise a QueueClosed exception if something has already found
# the item
each &q.method(:enq)
q.close # signal worker threads that we're finished
ts.each &:join # wait for threads cos one of them might still find the item
# All worker threads finished. exception raised (and ignored by this
# thread) if item found in the meantime.
found_item._ = nil
end
# wait for the item/nil/exception
found_item._
ensure
# the workers than an item has been found. Could also do this inside
# worker threads which would stop processing slightly earlier.
q.close
end
# Order-preserving parallel map using futures as a back channel.
# Returns collection of futures each containing the mapped value.
# Obviously bloc should be careful about mutable shared state
def pmap( threads: 4, &bloc )
q = SizedQueue.new threads
ts = threads.times.map do
Thread.new do
loop do
(future, item = q.pop) or raise StopIteration
future._ = bloc.call(item) rescue $!
end
end
end
enum = map
# evaluates to $!.result when StopIteration is raised, which will contain
# the set of values from enum.feed, ie the futures.
loop do
q << [future = Waitron.new, enum.next] # Waitron is a simple future
enum.feed future
end
ensure
q.close
# wait for mapping to finish. Could also do this by forcing the futures after loop
ts.each &:join
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment