Skip to content

Instantly share code, notes, and snippets.

@aflatter
Created May 8, 2013 00:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save aflatter/5537401 to your computer and use it in GitHub Desktop.
Save aflatter/5537401 to your computer and use it in GitHub Desktop.
The FuturePlexer for Celluloid :-)
require 'celluloid'
class FuturePlexer
class DrainedError < StandardError; end
attr_accessor :futures
def initialize(futures)
self.futures = futures
end
def select(timeout = nil, &block)
raise DrainedError, 'No futures left.' if drained?
Celluloid.receive(timeout) do |msg|
msg.is_a?(Celluloid::Future::Result) && futures.include?(msg.future)
end
results = []
futures.each do |future|
if future.ready?
results << future.value
futures.delete(future)
end
end
if block_given?
results.map(&block)
else
results
end
end
def drained?
futures.empty?
end
end
class MyActor
include Celluloid
include Celluloid::Logger
def initialize
futures = 10.times.map do |i|
future.work(i)
end
async.wait_for_tasks(futures)
end
def wait_for_tasks(futures)
plexer = FuturePlexer.new(futures)
until plexer.drained?
begin
plexer.select(0.150) do |result|
info("A task finished: #{result}")
end
rescue Celluloid::TimeoutError
info("Plexer doesn't have a result for us.")
rescue FuturePlexer::DrainedError
info("Plexer is already drained.")
end
end
info("All tasks have finished.")
end
def work(i)
time_to_work = (100 + rand(100)) / 1000.0
info("Task #{i} is going to take #{time_to_work}s.")
sleep(time_to_work)
info("Task #{i} is ready.")
i
end
end
MyActor.new
sleep
$ bundle exec ruby future_plexer.rb
I, [2013-05-08T02:47:02.079739 #15586] INFO -- : Task 0 is going to take 0.133s.
I, [2013-05-08T02:47:02.079994 #15586] INFO -- : Task 1 is going to take 0.137s.
I, [2013-05-08T02:47:02.080098 #15586] INFO -- : Task 2 is going to take 0.131s.
I, [2013-05-08T02:47:02.080207 #15586] INFO -- : Task 3 is going to take 0.121s.
I, [2013-05-08T02:47:02.080303 #15586] INFO -- : Task 4 is going to take 0.101s.
I, [2013-05-08T02:47:02.080434 #15586] INFO -- : Task 5 is going to take 0.188s.
I, [2013-05-08T02:47:02.080572 #15586] INFO -- : Task 6 is going to take 0.122s.
I, [2013-05-08T02:47:02.080712 #15586] INFO -- : Task 7 is going to take 0.157s.
I, [2013-05-08T02:47:02.081376 #15586] INFO -- : Task 8 is going to take 0.176s.
I, [2013-05-08T02:47:02.081609 #15586] INFO -- : Task 9 is going to take 0.114s.
I, [2013-05-08T02:47:02.181815 #15586] INFO -- : Task 4 is ready.
I, [2013-05-08T02:47:02.197616 #15586] INFO -- : Task 9 is ready.
I, [2013-05-08T02:47:02.201827 #15586] INFO -- : Task 3 is ready.
I, [2013-05-08T02:47:02.201923 #15586] INFO -- : Task 6 is ready.
I, [2013-05-08T02:47:02.212361 #15586] INFO -- : Task 2 is ready.
I, [2013-05-08T02:47:02.212509 #15586] INFO -- : Task 0 is ready.
I, [2013-05-08T02:47:02.217423 #15586] INFO -- : Task 1 is ready.
I, [2013-05-08T02:47:02.232533 #15586] INFO -- : A task finished: 0
I, [2013-05-08T02:47:02.232621 #15586] INFO -- : A task finished: 2
I, [2013-05-08T02:47:02.232666 #15586] INFO -- : A task finished: 4
I, [2013-05-08T02:47:02.232708 #15586] INFO -- : A task finished: 6
I, [2013-05-08T02:47:02.232747 #15586] INFO -- : A task finished: 9
I, [2013-05-08T02:47:02.238984 #15586] INFO -- : Task 7 is ready.
I, [2013-05-08T02:47:02.257820 #15586] INFO -- : Task 8 is ready.
I, [2013-05-08T02:47:02.269687 #15586] INFO -- : Task 5 is ready.
I, [2013-05-08T02:47:02.383617 #15586] INFO -- : A task finished: 1
I, [2013-05-08T02:47:02.383698 #15586] INFO -- : A task finished: 5
I, [2013-05-08T02:47:02.383751 #15586] INFO -- : A task finished: 8
I, [2013-05-08T02:47:02.534242 #15586] INFO -- : A task finished: 3
I, [2013-05-08T02:47:02.685123 #15586] INFO -- : A task finished: 7
I, [2013-05-08T02:47:02.685209 #15586] INFO -- : All tasks have finished.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment