Skip to content

Instantly share code, notes, and snippets.

@stevecj
Last active December 11, 2015 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stevecj/4576920 to your computer and use it in GitHub Desktop.
Save stevecj/4576920 to your computer and use it in GitHub Desktop.
Cleanly stoppable beaneater/beanstalkd agent. Waits for currently executing task to complete before stopping for INT (Ctrl+c) or TERM (kill) signals or receipt of an enqueued stop job.
#!/usr/bin/env ruby
require 'rubygems'
require 'bundler'
require 'beaneater'
beanstalk = Beaneater::Pool.new(['localhost:11300'])
sleep_tube = beanstalk.tubes['sleeper-take-a-nap']
sleep_tube.put '-'
beanstalk.close
#!/usr/bin/env ruby
require 'rubygems'
require 'bundler'
require 'beaneater'
beanstalk = Beaneater::Pool.new(['localhost:11300'])
stop_tube = beanstalk.tubes['sleeper-stop']
stop_tube.put '-'
beanstalk.close
#!/usr/bin/env ruby
# Demonstrates how to implement an agent using beaneater/beanstalkd that can
# be cleanly stopped after waiting for any currently running tasks to complete.
# The agent can be stopped either by enqueuing a stop request or by sending a
# SIGINT signal (e.g. Ctrl+C from the console).
#
# Shutting down cleanly by enqueuing a task is simple because a registered job
# can terminate the `process!` loop by raising Beaneater::AbortProcessingError.
# Since the agent only processes one request at a time, any request that is in
# progress will finish before the stop request can be handled.
#
# To make signal-handling work, we trap the signal, open another connection to
# the beanstalkd pool, and use that to enqueue a stop job.
require 'rubygems'
require 'bundler'
require 'beaneater'
require 'fileutils'
BeanstalkdPoolUrl = 'localhost:11300'
class Sleeper
SleepTubeName = 'sleeper-take-a-nap'
StopTubeName = 'sleeper-stop'
def process!
beanstalk = Beaneater::Pool.new([BeanstalkdPoolUrl])
beanstalk.jobs.register SleepTubeName, :retry_on => [StandardError] do |job|
touch_file
end
beanstalk.jobs.register StopTubeName do |job|
stop
end
beanstalk.jobs.process!
beanstalk.close
end
def stop!
beanstalk = Beaneater::Pool.new([BeanstalkdPoolUrl])
tube = beanstalk.tubes[StopTubeName]
tube.put '-'
beanstalk.close
end
private
def touch_file
FileUtils.touch 'sleep-started'
sleep 10
FileUtils.touch 'sleep-finished'
end
def stop
raise Beaneater::AbortProcessingError
end
end
sleeper = Sleeper.new
Signal.trap 'INT' do ; sleeper.stop! ; end
Signal.trap 'TERM' do ; sleeper.stop! ; end
sleeper.process!
puts 'good night'
@stravid
Copy link

stravid commented Nov 7, 2015

@stevecj what version of beaneater are you using? I'm trying your sleeper worker but can't get it to work, it always hangs on the put in line 46.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment