Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

Avoid time-of-measurement bias with Prometheus

This gist collects some of the relevant files used to produce the blog post Avoid time-of-measurement bias with Prometheus. It's not intended to provide a full working copy of the source code, but is a more full bodied implementation that is given in the blog post, hopefully helping anyone who wants to use this pattern in their own work.

The worker.rb is an extract from GoCardless' fork of chanks/que which is instrumented fully with Prometheus metrics. seed-jobs is used to generate a sample of jobs across duration ranges for the purpose of testing.

# frozen_string_literal: true
require "prometheus/client"
require "prometheus/client/tracer"
require "prometheus/middleware/trace_collector"
require "prometheus/middleware/exporter"
# Rack middleware DSL
workers = start_workers # Array[Worker]
# Collect each trace before serving the metrics
use Prometheus::Middleware::TraceCollector
use Prometheus::Middleware::Exporter
# frozen_string_literal: true
source "https://rubygems.org"
gem "prometheus-client-tracer", "~> 1.0"
GEM
remote: https://rubygems.org/
specs:
prometheus-client (0.10.0.pre.alpha.2)
prometheus-client-tracer (1.0.0)
prometheus-client (~> 0.10.0.alpha)
PLATFORMS
ruby
DEPENDENCIES
prometheus-client-tracer (~> 1.0)
BUNDLED WITH
2.0.1
#!/usr/bin/env ruby
# frozen_string_literal: true
require_relative "setup"
module Jobs
class Sleep < Que::Job
def run(interval)
sleep(interval)
end
end
end
unless ARGV.count == 3
puts(<<-USAGE)
Desc: Repopulate que_jobs with a random selection of jobs
Usage: seed-jobs <number-of-jobs> <duration-range> <priority-range>
Examples...
seed-jobs 100_000 0..0.5 1..25
seed-jobs 5_000 0 1
USAGE
exit(-1)
end
def parse_range(token)
[*Range.new(*token.split("..").map(&:to_i))]
rescue StandardError
[token.to_i]
end
now = Time.now
no_of_jobs = ARGV[0].to_i
duration_range = parse_range(ARGV[1])
priority_range = parse_range(ARGV[2])
Que.logger.info(msg: "truncating que_jobs table")
ActiveRecord::Base.connection.execute("TRUNCATE que_jobs;")
Que.logger.info(
msg: "seeding database",
now: now,
no_of_jobs: no_of_jobs,
duration_range: duration_range,
priority_range: priority_range,
)
ActiveRecord::Base.transaction do
no_of_jobs.times do
Jobs::Sleep.enqueue(
duration_range[(duration_range.size * (Random.rand ** 2)).to_i],
run_at: now + Random.rand,
priority: priority_range[Random.rand(priority_range.size)],
)
end
end
Que.logger.info(msg: "finished seeding database", jobs_in_table: QueJob.count)
# frozen_string_literal: true
require "pg"
require "benchmark"
require "prometheus/client"
require "prometheus/client/tracer"
require_relative "locker"
require_relative "job_timeout_error"
module Que
class Worker
# Defines the time a worker will wait before checking Postgres for its next job
DEFAULT_QUEUE = "default"
DEFAULT_WAKE_INTERVAL = 5
DEFAULT_LOCK_CURSOR_EXPIRY = 0 # seconds
METRICS = [
RunningSecondsTotal = Prometheus::Client::Counter.new(
:que_worker_running_seconds_total,
docstring: "Time since starting to work jobs",
labels: %i[queue worker],
),
SleepingSecondsTotal = Prometheus::Client::Counter.new(
:que_worker_sleeping_seconds_total,
docstring: "Time spent sleeping due to no jobs",
labels: %i[queue worker],
),
JobWorkedTotal = Prometheus::Client::Counter.new(
:que_job_worked_total,
docstring: "Counter for all jobs processed",
labels: %i[job_class priority queue],
),
JobErrorTotal = Prometheus::Client::Counter.new(
:que_job_error_total,
docstring: "Counter for all jobs that were run but errored",
labels: %i[job_class priority queue],
),
JobWorkedSecondsTotal = Prometheus::Client::Counter.new(
:que_job_worked_seconds_total,
docstring: "Sum of the time spent processing each job class",
labels: %i[job_class priority queue worker],
),
JobLatencySecondsTotal = Prometheus::Client::Counter.new(
:que_job_latency_seconds_total,
docstring: "Sum of time spent waiting in queue",
labels: %i[job_class priority queue],
),
].freeze
def initialize(
queue: DEFAULT_QUEUE,
wake_interval: DEFAULT_WAKE_INTERVAL,
lock_cursor_expiry: DEFAULT_LOCK_CURSOR_EXPIRY
)
@queue = queue
@wake_interval = wake_interval
@locker = Locker.new(queue: queue, cursor_expiry: lock_cursor_expiry)
@stop = false # instruct worker to stop
@stopped = false # mark worker as having stopped
end
attr_reader :metrics
def work_loop
return if @stop
Prometheus::Client.trace(RunningSecondsTotal, queue: @queue) do
loop do
case event = work
when :job_not_found, :postgres_error
Que.logger&.info(event: "que.#{event}", wake_interval: @wake_interval)
Prometheus::Client.trace(SleepingSecondsTotal, queue: @queue) { sleep(@wake_interval) }
when :job_worked
nil # immediately find a new job to work
end
break if @stop
end
end
ensure
@stopped = true
end
# rubocop:disable Metrics/MethodLength
# rubocop:disable Metrics/AbcSize
def work
Que.adapter.checkout do
@locker.with_locked_job do |job|
return :job_not_found if job.nil?
log_keys = {
priority: job["priority"],
queue: job["queue"],
handler: job["job_class"],
job_class: job["job_class"],
job_error_count: job["error_count"],
que_job_id: job["job_id"],
}
labels = {
job_class: job["job_class"], priority: job["priority"], queue: job["queue"]
}
begin
Que.logger&.info(
log_keys.merge(
event: "que_job.job_begin",
msg: "Job acquired, beginning work",
)
)
klass = class_for(job[:job_class])
# Note the time spent waiting in the queue before being processed, and update
# the jobs worked count here so that latency_seconds_total / worked_total
# doesn't suffer from skew.
JobLatencySecondsTotal.increment(by: job[:latency], labels: labels)
JobWorkedTotal.increment(labels: labels)
duration = Benchmark.measure do
Prometheus::Client.trace(JobWorkedSecondsTotal, labels) { klass.new(job)._run }
end.real
Que.logger&.info(
log_keys.merge(
event: "que_job.job_worked",
msg: "Successfully worked job",
duration: duration,
)
)
rescue StandardError, JobTimeoutError => error
JobErrorTotal.increment(labels: labels)
Que.logger&.error(
log_keys.merge(
event: "que_job.job_error",
msg: "Job failed with error",
error: error.inspect,
)
)
# For compatibility with que-failure, we need to allow failure handlers to be
# defined on the job class.
if klass.respond_to?(:handle_job_failure)
klass.handle_job_failure(error, job)
else
handle_job_failure(error, job)
end
end
:job_worked
end
end
rescue PG::Error => _error
# In the event that our Postgres connection is bad, we don't want that error to halt
# the work loop. Instead, we should let the work loop sleep and retry.
:postgres_error
end
# rubocop:enable Metrics/MethodLength
# rubocop:enable Metrics/AbcSize
def stop!
@stop = true
end
def stopped?
@stopped
end
private
# Set the error and retry with back-off
def handle_job_failure(error, job)
count = job[:error_count].to_i + 1
Que.execute(
:set_error, [
count,
count**4 + 3, # exponentially back off when retrying failures
"#{error.message}\n#{error.backtrace.join("\n")}",
*job.values_at(*Job::JOB_INSTANCE_FIELDS),
]
)
end
def class_for(string)
Que.constantize(string)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.