secret
Last active

absolutely no warranty.

  • Download Gist
scatter_enum.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
require 'thread'
require 'timeout'
 
class Scatter
include Enumerable
 
def initialize(*callables)
@threads = []
@callables = callables
@queue = Queue.new
@timeout = 60
end
 
def each(&block)
prepare
enumerate(&block)
ensure
cleanup
end
 
def first_successful
detect { |entry| !entry.is_a?(Exception) }
end
 
private
attr_reader :callables, :timeout, :queue
attr_accessor :threads
def cleanup
@threads.map(&:kill)
end
def enumerate
Timeout::timeout(timeout) do
callables.length.times do
yield queue.deq
end
end
end
def prepare
threads = callables.map do |callable|
Thread.new do
begin
message = callable.call
rescue Exception => error
ensure
queue.enq(error or message)
end
end
end
end
end
 
beta = proc { sleep 3; :beta }
gamma = proc { sleep 30; :gamma }
alpha = proc { raise :alpha }
theta = proc { sleep 1; :theta }
 
# returns once the first successful block completes.
# Great for working with unreliable IO
Scatter.new(alpha, beta, gamma, theta).first_successful
 
# you can also get direct access to the enumerator
enum = Scatter.new(alpha, beta, gamma, theta).to_enum
enum.next
> #<TypeError: exception class/object expected>
enum.next
> :theta
enum.next
> :beta
enum.next
> :gamma
enum.rewind
enum.next
> #<TypeError: exception class/object expected>
enum.next
> :theta
enum.next
> :gamma
enum.rewind
enum.take(4)
> [#<TypeError: exception class/object expected>, :theta, :beta, :gamma]

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.