Skip to content

Instantly share code, notes, and snippets.

@enebo
Created June 7, 2011 21:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save enebo/1013217 to your computer and use it in GitHub Desktop.
Save enebo/1013217 to your computer and use it in GitHub Desktop.
pi.rb
require "java"
$: << File.join(File.dirname(__FILE__), 'lib')
require 'scala-library'
require 'akka/akka-actor-1.1.2'
java_import 'akka.actor.Actors'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.CyclicIterator'
java_import 'akka.routing.InfiniteIterator'
java_import 'akka.routing.Routing'
java_import 'akka.routing.UntypedLoadBalancer'
java_import java.lang.System
java_import java.util.concurrent.CountDownLatch
def actorOf(&code)
Actors.actorOf(Class.new do
include UntypedActorFactory
define_method(:create) { |*args| code[*args] }
end.new)
end
class Calculate
def perform(master)
router = master.router
# schedule work
master.nMessages.times do |i|
first, last = i * master.nElements, (i+1) * master.nElements
router.sendOneWay PiWorker.new(first, last), master.context
end
# send a PoisonPill to all workers telling them to shut down themselves
router.sendOneWay(Routing::Broadcast.new(Actors.poisonPill))
# send a PoisonPill to the router, telling him to shut himself down
router.sendOneWay Actors.poisonPill
end
end
# Calculate Part of Pi
class PiWorker
def initialize(first, last)
@first, @last = first, last
end
def perform
(@first...@last).inject(0.0) do |acc, i|
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
end
end
end
class Result
def initialize(value)
@value = value
end
def perform(master)
master.pi += @value
master.nResults += 1
master.context.stop if master.nResults == master.nMessages
end
end
class Worker < UntypedActor
# needed by Actors.actorOf
def self.create(*args)
new *args
end
# message handler
def onReceive(work)
unless work.respond_to? :perform
raise ArgumentError.new "Unknown message: #{work}"
end
# perform the work and reply with the result
context.reply_unsafe Result.new(work.perform)
end
end
class PiRouter < UntypedLoadBalancer
attr_reader :seq
def initialize(workers)
super()
@seq = CyclicIterator.new(workers)
end
end
class Master < UntypedActor
attr_accessor :pi, :nResults
attr_reader :router, :nElements, :nMessages
def initialize(nWorkers, nMessages, nElements, latch)
super()
@nMessages, @nElements, @latch = nMessages, nElements, latch
@nResults, @pi = 0, 0.0
workers = Array.new(nWorkers) { Actors.actorOf(Worker).start }
# wrap them with a load-balancing router
@router = actorOf { PiRouter.new(workers) }.start
end
# message handler
def onReceive(message)
unless message.respond_to? :perform
raise ArgumentError.new "Unknown message [#{message}]"
end
message.perform self
end
def preStart
@start = Time.now
end
def postStop
puts "\n\tPi: \t\t#{@pi}\n\tTime taken: \t#{Time.now - @start} ms",
@latch.countDown
end
end
class Pi
def self.calculate(nWorkers, nElements, nMessages)
latch = CountDownLatch.new 1
master = Actors.actorOf { Master.new nWorkers, nMessages, nElements, latch }.start
master.send_one_way Calculate.new # start the calculation
latch.await # wait for master to shut down
end
end
Pi.calculate(4, 10000, 10000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment