Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Pi approximation program, Akka's first Java tutorial found on http://akka.io/docs/akka/1.1.2/intro/getting-started-first-java.html
require "java"
$: << File.join(File.dirname(__FILE__), 'lib')
require 'scala-library'
require 'akka/akka-actor-1.1.2'
java_import 'akka.actor.Actors'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.CyclicIterator'
java_import 'akka.routing.InfiniteIterator'
java_import 'akka.routing.Routing'
java_import 'akka.routing.UntypedLoadBalancer'
java_import java.util.concurrent.CountDownLatch
def actorOf(&code)
Actors.actorOf(Class.new do
include UntypedActorFactory
define_method(:create) do |*args|
code[*args]
end
end.new)
end
class Calculate; end
class Work < Struct.new(:start, :nrOfElements); end
class Result < Struct.new(:value); end
class Worker < UntypedActor
# needed by actorOf
def self.create(*args)
new *args
end
# define the work
def calculatePiFor(start, nrOfElements)
((start * nrOfElements)...((start + 1) * nrOfElements)).inject(0) do |acc, i|
acc + 4.0 * (1 - (i.modulo 2) * 2) / (2 * i + 1)
end
end
# message handler
def onReceive(message)
if message.kind_of? Work
work = message
# perform the work
result = calculatePiFor(work.start, work.nrOfElements)
# reply with the result
context.replyUnsafe(Result.new(result))
else
raise IllegalArgumentException.new "Unknown message [#{message + b}]"
end
end
end
class PiRouter < UntypedLoadBalancer
attr_reader :seq
def initialize(workers)
super()
@seq = CyclicIterator.new(workers)
end
end
class Master < UntypedActor
def initialize(nrOfWorkers, nrOfMessages, nrOfElements, latch)
super()
@nrOfMessages, @nrOfElements, @latch = nrOfMessages, nrOfElements, latch
@nrOfResults, @pi = 0, 0.0
# create the workers
workers = java.util.ArrayList.new
nrOfWorkers.times { workers << Actors.actorOf(Worker).start }
# wrap them with a load-balancing router
@router = actorOf { PiRouter.new(workers) }.start
end
# message handler
def onReceive(message)
if message.kind_of? Calculate
# schedule work
@nrOfMessages.times do |start|
@router.sendOneWay(Work.new(start, @nrOfElements), context)
end
# send a PoisonPill to all workers telling them to shut down themselves
@router.sendOneWay(Routing::Broadcast.new(Actors.poisonPill))
# send a PoisonPill to the router, telling him to shut himself down
@router.sendOneWay Actors.poisonPill
elsif message.kind_of? Result # handle result from the worker
@pi += message.value
@nrOfResults += 1
context.stop if @nrOfResults == @nrOfMessages
else
raise IllegalArgumentException.new "Unknown message [#{message}]"
end
end
def preStart
@start = java.lang.System.currentTimeMillis
end
def postStop
# tell the world that the calculation is complete
puts format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
@pi, (java.lang.System.currentTimeMillis - @start))
@latch.countDown
end
end
class Pi
def self.calculate(nrOfWorkers, nrOfElements, nrOfMessages)
# this latch is only plumbing to know when the calculation is completed
latch = CountDownLatch.new(1)
# create the master
master = Actors.actorOf do
Master.new(nrOfWorkers, nrOfMessages, nrOfElements, latch)
end.start
master.sendOneWay(Calculate.new) # start the calculation
latch.await # wait for master to shut down
end
end
Pi.calculate(4, 1000, 1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.