This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'que' | |
require 'logger' | |
require 'sequel' | |
# Preload MultiJson's code for finding most efficient json loader | |
# so we don't need to do this inside each worker process. | |
if defined?(MultiJson) | |
MultiJson.load('[]') | |
end | |
DB = Sequel.connect("postgres://localhost/tanga_dev") | |
Que.logger = Logger.new(STDERR) | |
Que.logger.level = Logger.const_get('DEBUG') | |
class QuickJob < Que::Job | |
def run | |
puts "finished" | |
end | |
end | |
class LongJob < Que::Job | |
def run | |
puts "starting long job" | |
sleep 5 | |
puts "finished long job" | |
end | |
end | |
module Que | |
def self.after_fork(&block) | |
@blocks ||= [] | |
# If block given, remember the setup | |
if block_given? | |
@blocks << block | |
else | |
Que.logger.info "Que.after_fork called" | |
@blocks.each { |b| b.call } | |
end | |
end | |
end | |
# My app's custom reconnect code after the fork | |
Que.after_fork do | |
DB.disconnect | |
Que.connection = DB | |
end | |
worker_pid = nil | |
stop = false | |
trap('INT') do | |
$stderr.puts "Asking worker process to stop..." | |
stop = true | |
Process.kill('INT', worker_pid) if worker_pid | |
end | |
loop do | |
break if stop | |
worker_pid = fork do | |
Que.after_fork | |
stop = false | |
trap('INT') {stop = true} | |
loop do | |
break if stop | |
result = Que::Job.work | |
if result && result[:event] == :job_unavailable | |
# No jobs worked, check again. | |
sleep 0.1 | |
else | |
# stop and rework new worker process | |
Que.logger.info "job status: #{ result[:event] }" | |
break | |
end | |
end | |
end | |
Process.wait(worker_pid) | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment