Skip to content

Instantly share code, notes, and snippets.

@jturkel
Last active December 26, 2015 21:59
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jturkel/7220372 to your computer and use it in GitHub Desktop.
Save jturkel/7220372 to your computer and use it in GitHub Desktop.
config.jobs = ActiveSupport::OrderedOptions.new
# Controls whether or not workers report heartbeats
config.jobs.heartbeat_enabled = true
# How often workers should send heartbeats
config.jobs.heartbeat_interval_seconds = 60
# How long a worker can go without sending a heartbeat before they're considered dead
config.jobs.heartbeat_timeout_seconds = 3 * 60
# How often to check for dead workers
config.jobs.dead_worker_polling_interval_seconds = 60
require File.expand_path('../../config/boot', __FILE__)
require File.expand_path('../../config/environment', __FILE__)
require 'clockwork'
include Clockwork
if Rails.configuration.jobs.heartbeat_enabled
every(Rails.configuration.jobs.dead_worker_polling_interval_seconds.seconds, 'Unlock orphaned jobs') do
Delayed::Plugins::HeartbeatPlugin.unlock_orphaned_jobs
end
end
class CreateDelayedWorkers < ActiveRecord::Migration
def change
create_table(:delayed_workers) do |t|
t.string :name
t.timestamp :last_heartbeat_at
end
add_index(:delayed_workers, :name, unique: true)
end
end
module Delayed
module Heartbeat
def self.unlock_orphaned_jobs(timeout_seconds = Rails.configuration.jobs.heartbeat_timeout_seconds)
WorkerModel.dead_workers(timeout_seconds).delete_all
orphaned_jobs = Delayed::Job.where("locked_at IS NOT NULL AND " \
"locked_by NOT IN (#{WorkerModel.active_names.to_sql})")
orphaned_jobs.update_all('locked_at = NULL, locked_by = NULL, attempts = attempts + 1')
end
end
module Delayed
module Heartbeat
class Plugin < Delayed::Plugin
callbacks do |lifecycle|
lifecycle.before(:execute) do |worker|
@heartbeat = WorkerHeartbeat.new(worker.name) if Rails.configuration.jobs.heartbeat_enabled
end
lifecycle.after(:execute) do |worker|
@heartbeat.stop if @heartbeat
end
end
end
end
end
Delayed::Worker.plugins << Delayed::Heartbeat::Plugin
module Delayed
module Heartbeat
class WorkerHeartbeat
def initialize(worker_name)
@worker_model = create_worker_model(worker_name)
# Use a self-pipe to safely shutdown the heartbeat thread
@stop_reader, @stop_writer = IO.pipe
@heartbeat_thread = Thread.new { run_heartbeat_loop }
# We don't want the worker to continue running if the
# heartbeat can't be written
@heartbeat_thread.abort_on_exception = true
end
def alive?
@heartbeat_thread.alive?
end
def stop
# Use the self-pipe to tell the heartbeat thread to cleanly
# shutdown
if @stop_writer
@stop_writer.write_nonblock('stop')
@stop_writer.close
@stop_writer = nil
end
end
private
def create_worker_model(worker_name)
WorkerModel.transaction do
# Just recreate the worker model to avoid the race condition where
# it gets deleted before we can update its last heartbeat
WorkerModel.where(name: worker_name).destroy_all
WorkerModel.create!(name: worker_name)
end
end
def run_heartbeat_loop
while true
break if sleep_interruptibly(heartbeat_interval)
@worker_model.update_heartbeat
end
rescue Exception => e
Rails.logger.error("Worker heartbeat error: #{e.message}: #{e.backtrace.join('\n')}")
raise e
ensure
Rails.logger.info('Shutting down worker heartbeat thread')
@stop_reader.close
@worker_model.delete
Delayed::Backend::ActiveRecord::Job.clear_active_connections!
end
def heartbeat_interval
Rails.configuration.jobs.heartbeat_interval_seconds
end
# Returns a truthy if the sleep was interrupted
def sleep_interruptibly(secs)
IO.select([@stop_reader], nil, nil, secs)
end
end
end
end
module Delayed
module Heartbeat
class WorkerModel < ActiveRecord::Base
self.table_name = 'delayed_workers'
attr_accessible :name, :last_heartbeat_at
before_create do |model|
model.last_heartbeat_at ||= Time.now.utc
end
def update_heartbeat
update_column(:last_heartbeat_at, Time.now.utc)
end
def self.dead_workers(timeout_seconds)
where('last_heartbeat_at < ?', Time.now.utc - timeout_seconds.seconds)
end
def self.active_names
select(:name)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment