Skip to content

Instantly share code, notes, and snippets.

@joevandyk
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joevandyk/9419422 to your computer and use it in GitHub Desktop.
Save joevandyk/9419422 to your computer and use it in GitHub Desktop.
require 'que'
require 'logger'
require 'sequel'
# Preload MultiJson's code for finding most efficient json loader
# so we don't need to do this inside each worker process.
if defined?(MultiJson)
MultiJson.load('[]')
end
DB = Sequel.connect("postgres://localhost/tanga_dev")
Que.logger = Logger.new(STDERR)
Que.logger.level = Logger.const_get('DEBUG')
class QuickJob < Que::Job
def run
puts "finished"
end
end
class LongJob < Que::Job
def run
puts "starting long job"
sleep 5
puts "finished long job"
end
end
module Que
def self.after_fork(&block)
@blocks ||= []
# If block given, remember the setup
if block_given?
@blocks << block
else
Que.logger.info "Que.after_fork called"
@blocks.each { |b| b.call }
end
end
end
# My app's custom reconnect code after the fork
Que.after_fork do
DB.disconnect
Que.connection = DB
end
worker_pid = nil
stop = false
trap('INT') do
$stderr.puts "Asking worker process to stop..."
stop = true
Process.kill('INT', worker_pid) if worker_pid
end
loop do
break if stop
worker_pid = fork do
Que.after_fork
stop = false
trap('INT') {stop = true}
loop do
break if stop
result = Que::Job.work
if result && result[:event] == :job_unavailable
# No jobs worked, check again.
sleep 0.1
else
# stop and rework new worker process
Que.logger.info "job status: #{ result[:event] }"
break
end
end
end
Process.wait(worker_pid)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment