public
Last active — forked from enebo/gist:1013217

Pi approximation program, Akka's first Java tutorial found on http://akka.io/docs/akka/1.1.2/intro/getting-started-first-java.html

  • Download Gist
pi.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
require "java"
 
$: << File.join(File.dirname(__FILE__), 'lib')
 
require 'scala-library'
require 'akka/akka-actor-1.1.2'
 
java_import 'akka.actor.Actors'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.CyclicIterator'
java_import 'akka.routing.InfiniteIterator'
java_import 'akka.routing.Routing'
java_import 'akka.routing.UntypedLoadBalancer'
java_import java.util.concurrent.CountDownLatch
 
def actorOf(&code)
Actors.actorOf(Class.new do
include UntypedActorFactory
define_method(:create) do |*args|
code[*args]
end
end.new)
end
 
class Calculate; end
class Work < Struct.new(:start, :nrOfElements); end
class Result < Struct.new(:value); end
 
class Worker < UntypedActor
# needed by actorOf
def self.create(*args)
new *args
end
 
# define the work
def calculatePiFor(start, nrOfElements)
((start * nrOfElements)...((start + 1) * nrOfElements)).inject(0) do |acc, i|
acc + 4.0 * (1 - (i.modulo 2) * 2) / (2 * i + 1)
end
end
 
# message handler
def onReceive(message)
if message.kind_of? Work
work = message
 
# perform the work
result = calculatePiFor(work.start, work.nrOfElements)
 
# reply with the result
context.replyUnsafe(Result.new(result))
 
else
raise IllegalArgumentException.new "Unknown message [#{message + b}]"
end
end
end
 
class PiRouter < UntypedLoadBalancer
attr_reader :seq
 
def initialize(workers)
super()
@seq = CyclicIterator.new(workers)
end
end
 
class Master < UntypedActor
def initialize(nrOfWorkers, nrOfMessages, nrOfElements, latch)
super()
@nrOfMessages, @nrOfElements, @latch = nrOfMessages, nrOfElements, latch
@nrOfResults, @pi = 0, 0.0
 
# create the workers
workers = java.util.ArrayList.new
nrOfWorkers.times { workers << Actors.actorOf(Worker).start }
 
# wrap them with a load-balancing router
@router = actorOf { PiRouter.new(workers) }.start
end
 
# message handler
def onReceive(message)
if message.kind_of? Calculate
# schedule work
@nrOfMessages.times do |start|
@router.sendOneWay(Work.new(start, @nrOfElements), context)
end
 
# send a PoisonPill to all workers telling them to shut down themselves
@router.sendOneWay(Routing::Broadcast.new(Actors.poisonPill))
 
# send a PoisonPill to the router, telling him to shut himself down
@router.sendOneWay Actors.poisonPill
elsif message.kind_of? Result # handle result from the worker
@pi += message.value
@nrOfResults += 1
context.stop if @nrOfResults == @nrOfMessages
else
raise IllegalArgumentException.new "Unknown message [#{message}]"
end
end
 
def preStart
@start = java.lang.System.currentTimeMillis
end
 
def postStop
# tell the world that the calculation is complete
puts format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
@pi, (java.lang.System.currentTimeMillis - @start))
@latch.countDown
end
end
 
 
class Pi
def self.calculate(nrOfWorkers, nrOfElements, nrOfMessages)
# this latch is only plumbing to know when the calculation is completed
latch = CountDownLatch.new(1)
 
# create the master
master = Actors.actorOf do
Master.new(nrOfWorkers, nrOfMessages, nrOfElements, latch)
end.start
master.sendOneWay(Calculate.new) # start the calculation
latch.await # wait for master to shut down
end
end
 
Pi.calculate(4, 1000, 1000)

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.