Skip to content

Instantly share code, notes, and snippets.

@dmcclory
Created February 19, 2014 01:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dmcclory/9084502 to your computer and use it in GitHub Desktop.
Save dmcclory/9084502 to your computer and use it in GitHub Desktop.
require 'rinda/tuplespace'
# Workers will be initialized a Rinda::TupleSpace
# (which we can use as a stand-in for Redis
class MapReduceWorker
def initialize(tuplespace)
@ts = tuplespace
@name = "MapReduceJob"
end
def work
return if @ts.read([@name, nil])
job = @ts.take([@name, nil]).last
puts "Performing a MapReduceJob!"
result = job[:data].inject(&job[:method])
puts "computed result: #{result}"
end
end
# bin/caribou is responsible for initializing workers,managing the TupleSpace
URI = "druby://localhost:7777"
ts = Rinda::TupleSpace.new
# initialize all the workers, three steps:
# 1. create a worker
m = MapReduceWorker.new(ts)
# 2. create an observer which will fire when
# a compatible item is added to the TupleSpace
observer = ts.notify 'write', ['MapReduceJob', nil]
# 3. kick off a thread with a block to call worker.work, when something published to TupleSpace
Thread.start do
observer.each { |t| m.work }
end
DRb.start_service(URI, ts)
DRb.thread.join
# then another process has to write Jobs to the tuplespace
#
Caribou.connect("druby://localhost:7777")
def some_controller
#...
Caribou.enqueue([MapReduceWorker, {data: [1,2,3,4,5], method: :+ }])
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment