Skip to content

Instantly share code, notes, and snippets.

@danmayer
Created November 19, 2008 06:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danmayer/26434 to your computer and use it in GitHub Desktop.
Save danmayer/26434 to your computer and use it in GitHub Desktop.
require 'beanstalk-client.rb'
require 'ruby-debug'
DEFAULT_PORT = 11300
SERVER_IP = '127.0.0.1'
#beanstalk will order the queues based on priority, with the same priority
#it acts FIFO, in a later example we will use the priority
#(higher numbers are higher priority)
DEFAULT_PRIORITY = 65536
#TTR is time for the job to reappear on the queue.
#Assuming a worker died before completing work and never called job.delete
#the same job would return back on the queue (in TTR seconds)
TTR = 3
class BeanBase
#To work with multiple queues you must tell beanstalk which queues
#you plan on writing to (use), and which queues you will reserve jobs from
#(watch). In this case we also want to ignore the default queue
#you need a different queue object for each tube you plan on using or
#you can switch what the tub is watching and using a bunch, we just keep a few
#queues open on the tubes we want.
def get_queue(queue_name)
@queue_cache ||= {}
if @queue_cache.has_key?(queue_name)
return @queue_cache[queue_name]
else
queue = Beanstalk::Pool.new(["#{SERVER_IP}:#{DEFAULT_PORT}"])
queue.watch(queue_name)
queue.use(queue_name)
queue.ignore('default')
@queue_cache[queue_name] = queue
return queue
end
end
#this will take a message off the queue, and process it with the block
def take_msg(queue)
msg = queue.reserve
#by calling ybody we get the content of the message and convert it from yml
body = msg.ybody
if block_given?
yield(body)
end
msg.delete
end
def results_ready?(queue)
queue.peek_ready!=nil
end
end
class BeanDistributor < BeanBase
def initialize(chunks,points_per_chunk)
@chunks = chunks
@points_per_chunk = points_per_chunk
@messages_out = 0
@circle_count = 0
end
def get_incoming_results(queue)
if(results_ready?(queue))
result = nil
take_msg(queue) do |body|
result = body.count
end
@messages_out -= 1
print "." #display that we received another result
@circle_count += result
else
#do nothing
end
end
def start_distributor
request_queue = get_queue('requests')
results_queue = get_queue('results')
#put all the work on the request queue
puts "distributor sending out #{@messages} jobs"
@chunks.times do |num|
msg = BeanRequest.new(1,@points_per_chunk)
#Take our ruby object and convert it to yml and put it on the queue
request_queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
@messages_out += 1
#if there are results get them if not continue sending out work
get_incoming_results(results_queue)
end
while @messages_out > 0
get_incoming_results(results_queue)
end
npoints = @chunks * @points_per_chunk
pi = 4.0*@circle_count/(npoints)
puts "\nreceived all the results our estimate for pi is: #{pi}"
end
end
class BeanWorker < BeanBase
def initialize()
end
def write_result(queue, result)
msg = BeanResult.new(1,result)
queue.yput(msg,pri=DEFAULT_PRIORITY, delay=0, ttr=TTR)
end
def in_circle
#generate 2 random numbers see if they are in the circle
range = 1000000.0
radius = range / 2
xcord = rand(range) - radius
ycord = rand(range) - radius
if( (xcord**2) + (ycord**2) <= (radius**2) )
return 1
else
return 0
end
end
def start_worker
request_queue = get_queue('requests')
results_queue = get_queue('results')
#get requests and do the work until the worker is killed
while(true)
result = 0
take_msg(request_queue) do |body|
chunks = body.count
chunks.times { result += in_circle}
end
write_result(results_queue,result)
end
end
end
############
# These are just simple message classes that we pass using beanstalks
# to yml and from yml functions.
############
class BeanRequest
attr_accessor :project_id, :count
def initialize(project_id, count=0)
@project_id = project_id
@count = count
end
end
class BeanResult
attr_accessor :project_id, :count
def initialize(project_id, count=0)
@project_id = project_id
@count = count
end
end
#how many different jobs we should do
chunks = 100
#how many points to calculate per chunk
points_per_chunk = 10000
#how many workers should we have
#(normally different machines, in our example fork them off)
workers = 5
# Most of the time you will have two entirely separate classes
# but to make it easy to run this example we will just fork and start our server
# and client separately. We will wait for them to complete and check
# if we received all the messages we expected.
puts "starting distributor"
server_pid = fork {
BeanDistributor.new(chunks,points_per_chunk).start_distributor
}
puts "starting client(s)"
client_pids = []
workers.times do |num|
client_pid = fork {
BeanWorker.new.start_worker
}
client_pids << client_pid
end
Process.wait(server_pid)
#take down the clients
client_pids.each do |pid|
Process.kill("HUP",pid)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment