Create a gist now

Instantly share code, notes, and snippets.

Pi approximation program, Akka's first Java tutorial found on http://akka.io/docs/akka/1.1.2/intro/getting-started-first-java.html
require "java"
$: << File.join(File.dirname(__FILE__), 'lib')
require 'scala-library'
require 'akka/akka-actor-1.1.2'
java_import 'akka.actor.Actors'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.CyclicIterator'
java_import 'akka.routing.InfiniteIterator'
java_import 'akka.routing.Routing'
java_import 'akka.routing.UntypedLoadBalancer'
java_import java.util.concurrent.CountDownLatch
def actorOf(&code)
Actors.actorOf(Class.new do
include UntypedActorFactory
define_method(:create) do |*args|
code[*args]
end
end.new)
end
class Calculate; end
class Work < Struct.new(:start, :nrOfElements); end
class Result < Struct.new(:value); end
class Worker < UntypedActor
# needed by actorOf
def self.create(*args)
new *args
end
# define the work
def calculatePiFor(start, nrOfElements)
((start * nrOfElements)...((start + 1) * nrOfElements)).inject(0) do |acc, i|
acc + 4.0 * (1 - (i.modulo 2) * 2) / (2 * i + 1)
end
end
# message handler
def onReceive(message)
if message.kind_of? Work
work = message
# perform the work
result = calculatePiFor(work.start, work.nrOfElements)
# reply with the result
context.replyUnsafe(Result.new(result))
else
raise IllegalArgumentException.new "Unknown message [#{message + b}]"
end
end
end
class PiRouter < UntypedLoadBalancer
attr_reader :seq
def initialize(workers)
super()
@seq = CyclicIterator.new(workers)
end
end
class Master < UntypedActor
def initialize(nrOfWorkers, nrOfMessages, nrOfElements, latch)
super()
@nrOfMessages, @nrOfElements, @latch = nrOfMessages, nrOfElements, latch
@nrOfResults, @pi = 0, 0.0
# create the workers
workers = java.util.ArrayList.new
nrOfWorkers.times { workers << Actors.actorOf(Worker).start }
# wrap them with a load-balancing router
@router = actorOf { PiRouter.new(workers) }.start
end
# message handler
def onReceive(message)
if message.kind_of? Calculate
# schedule work
@nrOfMessages.times do |start|
@router.sendOneWay(Work.new(start, @nrOfElements), context)
end
# send a PoisonPill to all workers telling them to shut down themselves
@router.sendOneWay(Routing::Broadcast.new(Actors.poisonPill))
# send a PoisonPill to the router, telling him to shut himself down
@router.sendOneWay Actors.poisonPill
elsif message.kind_of? Result # handle result from the worker
@pi += message.value
@nrOfResults += 1
context.stop if @nrOfResults == @nrOfMessages
else
raise IllegalArgumentException.new "Unknown message [#{message}]"
end
end
def preStart
@start = java.lang.System.currentTimeMillis
end
def postStop
# tell the world that the calculation is complete
puts format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
@pi, (java.lang.System.currentTimeMillis - @start))
@latch.countDown
end
end
class Pi
def self.calculate(nrOfWorkers, nrOfElements, nrOfMessages)
# this latch is only plumbing to know when the calculation is completed
latch = CountDownLatch.new(1)
# create the master
master = Actors.actorOf do
Master.new(nrOfWorkers, nrOfMessages, nrOfElements, latch)
end.start
master.sendOneWay(Calculate.new) # start the calculation
latch.await # wait for master to shut down
end
end
Pi.calculate(4, 1000, 1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment