Skip to content

Instantly share code, notes, and snippets.

@jufemaiz
Created August 22, 2018 01:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jufemaiz/dc016c844038c9b979c8b2dea5c98286 to your computer and use it in GitHub Desktop.
Save jufemaiz/dc016c844038c9b979c8b2dea5c98286 to your computer and use it in GitHub Desktop.
Delayed::Job container based rake tasks
# Shamelessly ripped from Delayed::Job tasks
# Ref: https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/tasks.rb
namespace :jobs do
namespace :container do
desc 'Clear the delayed_job queue.'
task clear: :environment do
Delayed::Job.delete_all
end
desc 'Start a delayed_job worker.'
task work: :environment_options do
worker = Delayed::Worker.new(@worker_options)
worker.name = @worker_name
worker.start
end
desc 'Start a delayed_job worker and exit when all available jobs are complete.'
task workoff: :environment_options do
worker = Delayed::Worker.new(@worker_options.merge(exit_on_complete: true))
worker.name = @worker_name
worker.start
end
task environment_options: :environment do
@worker_options = {
min_priority: ENV['MIN_PRIORITY'],
max_priority: ENV['MAX_PRIORITY'],
queues: (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','),
quiet: ENV['QUIET']
}
@worker_options[:sleep_delay] = ENV['SLEEP_DELAY'].to_i if ENV['SLEEP_DELAY']
@worker_options[:read_ahead] = ENV['READ_AHEAD'].to_i if ENV['READ_AHEAD']
# Get the Container ID
container_id = `(cat /proc/self/cgroup | grep 'ecs' | sed 's\/^.*\\\/\/\/' | tail -n1)`.strip
container_id = `(cat /proc/self/cgroup | grep 'docker' | sed 's\/^.*\\\/\/\/' | tail -n1)`.strip if container_id.blank?
container_id = nil if container_id.blank?
# Get the hostname
hostname = Socket.gethostname rescue 'DefaultDelayedJobWorker' # rubocop:disable Style/RescueModifier
# Get the process
process_id = Process.pid rescue 'DefaultProcessID' # rubocop:disable Style/RescueModifier
@worker_name = [
"host: #{hostname}",
container_id.nil? ? nil : "container: #{container_id}",
"pid: #{process_id}"
].compact.join(' ')
end
desc "Exit with error status if any jobs older than max_age seconds haven't been attempted yet."
task :check, [:max_age] => :environment do |_, args|
args.with_defaults(max_age: 300)
unprocessed_jobs = Delayed::Job.where('attempts = 0 AND created_at < ?', Time.now - args[:max_age].to_i)
.count
if unprocessed_jobs.positive?
raise "#{unprocessed_jobs} jobs older than #{args[:max_age]} seconds have not been processed yet"
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment