Created — forked from enebo/gist:1013217

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

Pi approximation program, Akka's first Java tutorial found on http://akka.io/docs/akka/1.1.2/intro/getting-started-first-java.html

View pi.rb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
require "java"
 
$: << File.join(File.dirname(__FILE__), 'lib')
 
require 'scala-library'
require 'akka/akka-actor-1.1.2'
 
java_import 'akka.actor.Actors'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.CyclicIterator'
java_import 'akka.routing.InfiniteIterator'
java_import 'akka.routing.Routing'
java_import 'akka.routing.UntypedLoadBalancer'
java_import java.util.concurrent.CountDownLatch
 
def actorOf(&code)
Actors.actorOf(Class.new do
include UntypedActorFactory
define_method(:create) do |*args|
code[*args]
end
end.new)
end
 
class Calculate; end
class Work < Struct.new(:start, :nrOfElements); end
class Result < Struct.new(:value); end
 
class Worker < UntypedActor
# needed by actorOf
def self.create(*args)
new *args
end
 
# define the work
def calculatePiFor(start, nrOfElements)
((start * nrOfElements)...((start + 1) * nrOfElements)).inject(0) do |acc, i|
acc + 4.0 * (1 - (i.modulo 2) * 2) / (2 * i + 1)
end
end
 
# message handler
def onReceive(message)
if message.kind_of? Work
work = message
 
# perform the work
result = calculatePiFor(work.start, work.nrOfElements)
 
# reply with the result
context.replyUnsafe(Result.new(result))
 
else
raise IllegalArgumentException.new "Unknown message [#{message + b}]"
end
end
end
 
class PiRouter < UntypedLoadBalancer
attr_reader :seq
 
def initialize(workers)
super()
@seq = CyclicIterator.new(workers)
end
end
 
class Master < UntypedActor
def initialize(nrOfWorkers, nrOfMessages, nrOfElements, latch)
super()
@nrOfMessages, @nrOfElements, @latch = nrOfMessages, nrOfElements, latch
@nrOfResults, @pi = 0, 0.0
 
# create the workers
workers = java.util.ArrayList.new
nrOfWorkers.times { workers << Actors.actorOf(Worker).start }
 
# wrap them with a load-balancing router
@router = actorOf { PiRouter.new(workers) }.start
end
 
# message handler
def onReceive(message)
if message.kind_of? Calculate
# schedule work
@nrOfMessages.times do |start|
@router.sendOneWay(Work.new(start, @nrOfElements), context)
end
 
# send a PoisonPill to all workers telling them to shut down themselves
@router.sendOneWay(Routing::Broadcast.new(Actors.poisonPill))
 
# send a PoisonPill to the router, telling him to shut himself down
@router.sendOneWay Actors.poisonPill
elsif message.kind_of? Result # handle result from the worker
@pi += message.value
@nrOfResults += 1
context.stop if @nrOfResults == @nrOfMessages
else
raise IllegalArgumentException.new "Unknown message [#{message}]"
end
end
 
def preStart
@start = java.lang.System.currentTimeMillis
end
 
def postStop
# tell the world that the calculation is complete
puts format("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
@pi, (java.lang.System.currentTimeMillis - @start))
@latch.countDown
end
end
 
 
class Pi
def self.calculate(nrOfWorkers, nrOfElements, nrOfMessages)
# this latch is only plumbing to know when the calculation is completed
latch = CountDownLatch.new(1)
 
# create the master
master = Actors.actorOf do
Master.new(nrOfWorkers, nrOfMessages, nrOfElements, latch)
end.start
master.sendOneWay(Calculate.new) # start the calculation
latch.await # wait for master to shut down
end
end
 
Pi.calculate(4, 1000, 1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.