Skip to content

Instantly share code, notes, and snippets.

@christian-schulze
Forked from spiegela/gist:4712000
Last active February 7, 2016 07:46
Show Gist options
  • Save christian-schulze/44e0dde4e9e8d824f546 to your computer and use it in GitHub Desktop.
Save christian-schulze/44e0dde4e9e8d824f546 to your computer and use it in GitHub Desktop.
JRuby & Akka Example
#!/usr/bin/env ruby
require 'java'
require 'scala-library.jar'
require 'config-1.0.0.jar'
require 'akka-actor_2.10-2.1.0.jar'
java_import 'java.io.Serializable'
java_import 'akka.actor.UntypedActor'
java_import 'akka.actor.ActorRef'
java_import 'akka.actor.ActorSystem'
java_import 'akka.actor.Props'
java_import 'akka.actor.UntypedActorFactory'
java_import 'akka.routing.RoundRobinRouter'
java_import 'java.lang.System'
java_import 'scala.concurrent.duration.Duration'
java_import 'java.util.concurrent.TimeUnit'
# Wrapper for calculate message
class Calculate; end
# Wrapper for work message
class Work
attr_reader :start, :no_elements
def initialize(start, no_elements)
@start = start
@no_elements = no_elements
end
end
# Wrapper for result
class Result
attr_reader :value
def initialize(value)
@value = value
end
end
# Wrapper for final result
class PiApproximation
attr_reader :pi, :duration
def initialize(pi, duration)
@pi = pi
@duration = duration
end
end
# The actual worker
class Worker < UntypedActor
class << self
alias_method :apply, :new
alias_method :create, :new
end
def calculate_for_pi(start, no_elements)
start_elem = start * no_elements
end_elem = (start + 1) * no_elements - 1
# calculation algorithm
(start_elem..end_elem).reduce(0.0) do |result, element|
result + (4.0 * (1 - (element % 2) * 2) / (2 * element + 1))
end
end
def onReceive(message)
if message.is_a?(Work)
result = calculate_for_pi(message.start, message.no_elements)
# send the result message back to the sender
get_sender.tell(Result.new(result), get_self)
else
unhandled(message)
end
end
end
class Master < UntypedActor
attr_accessor :start, :no_workers, :no_chunks, :no_elements, :listener, :pi, :no_results
class << self
alias_method :apply, :new
alias_method :create, :new
end
def init_worker
props = Props.new(Worker).with_router(RoundRobinRouter.new no_workers)
@worker_router = self.get_context.actorOf(props, "workerRouter")
end
def onReceive(message)
case message
when Calculate
(0..no_chunks).each do |number|
@worker_router.tell(Work.new(number, @no_elements), get_self)
end
when Result
result = message
@pi = @pi + result.value
@no_results += 1
if @no_results == @no_chunks
duration = Duration.create(System.current_time_millis - @start, TimeUnit::MILLISECONDS)
@listener.tell(PiApproximation.new(@pi, duration), get_self)
get_context.stop(get_self)
end
default
unhandled message
end
end
end
class Listener < UntypedActor
class << self
alias_method :apply, :new
alias_method :create, :new
end
def onReceive(message)
if message.is_a?(PiApproximation)
puts "Value of Pi is: #{message.pi}"
puts "Duration of calculation is: #{message.duration}"
else
unhandled(message)
end
get_context.system.shutdown
end
end
class MasterFactory
include UntypedActorFactory
def initialize(listener)
@@listener = listener
end
def create
self.class.create
end
def self.create
master = Master.new
master.no_workers = 8
master.no_chunks = 10000
master.no_elements = 10000
master.listener = @@listener
master.start = System.current_time_millis
master.pi = 0
master.no_results = 0
master.init_worker
return master
end
end
system = ActorSystem.create("PiSystem")
listener = system.actorOf(Props.new(Listener), "listener")
master_props = Props.new(MasterFactory.new(listener))
master = system.actorOf(master_props, "master")
master.tell Calculate.new
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment