Skip to content

@stefanpenner /scatter_enum.rb secret
Last active

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
absolutely no warranty.
require 'thread'
require 'timeout'
class Scatter
include Enumerable
def initialize(*callables)
@threads = []
@callables = callables
@queue = Queue.new
@timeout = 60
end
def each(&block)
prepare
enumerate(&block)
ensure
cleanup
end
def first_successful
detect { |entry| !entry.is_a?(Exception) }
end
private
attr_reader :callables, :timeout, :queue
attr_accessor :threads
def cleanup
@threads.map(&:kill)
end
def enumerate
Timeout::timeout(timeout) do
callables.length.times do
yield queue.deq
end
end
end
def prepare
threads = callables.map do |callable|
Thread.new do
begin
message = callable.call
rescue Exception => error
ensure
queue.enq(error or message)
end
end
end
end
end
beta = proc { sleep 3; :beta }
gamma = proc { sleep 30; :gamma }
alpha = proc { raise :alpha }
theta = proc { sleep 1; :theta }
# returns once the first successful block completes.
# Great for working with unreliable IO
Scatter.new(alpha, beta, gamma, theta).first_successful
# you can also get direct access to the enumerator
enum = Scatter.new(alpha, beta, gamma, theta).to_enum
enum.next
> #<TypeError: exception class/object expected>
enum.next
> :theta
enum.next
> :beta
enum.next
> :gamma
enum.rewind
enum.next
> #<TypeError: exception class/object expected>
enum.next
> :theta
enum.next
> :gamma
enum.rewind
enum.take(4)
> [#<TypeError: exception class/object expected>, :theta, :beta, :gamma]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.