|
# frozen_string_literal: true |
|
|
|
require "pg" |
|
require "benchmark" |
|
require "prometheus/client" |
|
require "prometheus/client/tracer" |
|
|
|
require_relative "locker" |
|
require_relative "job_timeout_error" |
|
|
|
module Que |
|
class Worker |
|
# Defines the time a worker will wait before checking Postgres for its next job |
|
DEFAULT_QUEUE = "default" |
|
DEFAULT_WAKE_INTERVAL = 5 |
|
DEFAULT_LOCK_CURSOR_EXPIRY = 0 # seconds |
|
|
|
METRICS = [ |
|
RunningSecondsTotal = Prometheus::Client::Counter.new( |
|
:que_worker_running_seconds_total, |
|
docstring: "Time since starting to work jobs", |
|
labels: %i[queue worker], |
|
), |
|
SleepingSecondsTotal = Prometheus::Client::Counter.new( |
|
:que_worker_sleeping_seconds_total, |
|
docstring: "Time spent sleeping due to no jobs", |
|
labels: %i[queue worker], |
|
), |
|
JobWorkedTotal = Prometheus::Client::Counter.new( |
|
:que_job_worked_total, |
|
docstring: "Counter for all jobs processed", |
|
labels: %i[job_class priority queue], |
|
), |
|
JobErrorTotal = Prometheus::Client::Counter.new( |
|
:que_job_error_total, |
|
docstring: "Counter for all jobs that were run but errored", |
|
labels: %i[job_class priority queue], |
|
), |
|
JobWorkedSecondsTotal = Prometheus::Client::Counter.new( |
|
:que_job_worked_seconds_total, |
|
docstring: "Sum of the time spent processing each job class", |
|
labels: %i[job_class priority queue worker], |
|
), |
|
JobLatencySecondsTotal = Prometheus::Client::Counter.new( |
|
:que_job_latency_seconds_total, |
|
docstring: "Sum of time spent waiting in queue", |
|
labels: %i[job_class priority queue], |
|
), |
|
].freeze |
|
|
|
def initialize( |
|
queue: DEFAULT_QUEUE, |
|
wake_interval: DEFAULT_WAKE_INTERVAL, |
|
lock_cursor_expiry: DEFAULT_LOCK_CURSOR_EXPIRY |
|
) |
|
@queue = queue |
|
@wake_interval = wake_interval |
|
@locker = Locker.new(queue: queue, cursor_expiry: lock_cursor_expiry) |
|
@stop = false # instruct worker to stop |
|
@stopped = false # mark worker as having stopped |
|
end |
|
|
|
attr_reader :metrics |
|
|
|
def work_loop |
|
return if @stop |
|
|
|
Prometheus::Client.trace(RunningSecondsTotal, queue: @queue) do |
|
loop do |
|
case event = work |
|
when :job_not_found, :postgres_error |
|
Que.logger&.info(event: "que.#{event}", wake_interval: @wake_interval) |
|
Prometheus::Client.trace(SleepingSecondsTotal, queue: @queue) { sleep(@wake_interval) } |
|
when :job_worked |
|
nil # immediately find a new job to work |
|
end |
|
|
|
break if @stop |
|
end |
|
end |
|
ensure |
|
@stopped = true |
|
end |
|
|
|
# rubocop:disable Metrics/MethodLength |
|
# rubocop:disable Metrics/AbcSize |
|
def work |
|
Que.adapter.checkout do |
|
@locker.with_locked_job do |job| |
|
return :job_not_found if job.nil? |
|
|
|
log_keys = { |
|
priority: job["priority"], |
|
queue: job["queue"], |
|
handler: job["job_class"], |
|
job_class: job["job_class"], |
|
job_error_count: job["error_count"], |
|
que_job_id: job["job_id"], |
|
} |
|
|
|
labels = { |
|
job_class: job["job_class"], priority: job["priority"], queue: job["queue"] |
|
} |
|
|
|
begin |
|
Que.logger&.info( |
|
log_keys.merge( |
|
event: "que_job.job_begin", |
|
msg: "Job acquired, beginning work", |
|
) |
|
) |
|
|
|
klass = class_for(job[:job_class]) |
|
|
|
# Note the time spent waiting in the queue before being processed, and update |
|
# the jobs worked count here so that latency_seconds_total / worked_total |
|
# doesn't suffer from skew. |
|
JobLatencySecondsTotal.increment(by: job[:latency], labels: labels) |
|
JobWorkedTotal.increment(labels: labels) |
|
|
|
duration = Benchmark.measure do |
|
Prometheus::Client.trace(JobWorkedSecondsTotal, labels) { klass.new(job)._run } |
|
end.real |
|
|
|
Que.logger&.info( |
|
log_keys.merge( |
|
event: "que_job.job_worked", |
|
msg: "Successfully worked job", |
|
duration: duration, |
|
) |
|
) |
|
rescue StandardError, JobTimeoutError => error |
|
JobErrorTotal.increment(labels: labels) |
|
Que.logger&.error( |
|
log_keys.merge( |
|
event: "que_job.job_error", |
|
msg: "Job failed with error", |
|
error: error.inspect, |
|
) |
|
) |
|
|
|
# For compatibility with que-failure, we need to allow failure handlers to be |
|
# defined on the job class. |
|
if klass.respond_to?(:handle_job_failure) |
|
klass.handle_job_failure(error, job) |
|
else |
|
handle_job_failure(error, job) |
|
end |
|
end |
|
:job_worked |
|
end |
|
end |
|
rescue PG::Error => _error |
|
# In the event that our Postgres connection is bad, we don't want that error to halt |
|
# the work loop. Instead, we should let the work loop sleep and retry. |
|
:postgres_error |
|
end |
|
# rubocop:enable Metrics/MethodLength |
|
# rubocop:enable Metrics/AbcSize |
|
|
|
def stop! |
|
@stop = true |
|
end |
|
|
|
def stopped? |
|
@stopped |
|
end |
|
|
|
private |
|
|
|
# Set the error and retry with back-off |
|
def handle_job_failure(error, job) |
|
count = job[:error_count].to_i + 1 |
|
|
|
Que.execute( |
|
:set_error, [ |
|
count, |
|
count**4 + 3, # exponentially back off when retrying failures |
|
"#{error.message}\n#{error.backtrace.join("\n")}", |
|
*job.values_at(*Job::JOB_INSTANCE_FIELDS), |
|
] |
|
) |
|
end |
|
|
|
def class_for(string) |
|
Que.constantize(string) |
|
end |
|
end |
|
end |