Skip to content

Instantly share code, notes, and snippets.

@marckohlbrugge
Created February 29, 2024 09:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marckohlbrugge/e345fd64c587d77ceedd32a61308143c to your computer and use it in GitHub Desktop.
Save marckohlbrugge/e345fd64c587d77ceedd32a61308143c to your computer and use it in GitHub Desktop.
Attempt at adding throttling to Good Job (work in progress)
module GoodJobThrottleExtension
extend ActiveSupport::Concern
GoodJobThrottleExceededError = Class.new(GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError)
included do
include GoodJob::ActiveJobExtensions::Concurrency
class_attribute :throttle_enabled, default: false
class_attribute :throttle_count, default: 2
class_attribute :throttle_period, default: 1.minute
class_attribute :throttle_key, default: -> { self.class.name }
before_perform do |job|
# binding.debugger
next unless job.class.throttle_enabled
throttle_key = job.class.throttle_key.respond_to?(:call) ? job.class.throttle_key.call(job) : job.class.throttle_key
throttle_count = job.class.throttle_count.respond_to?(:call) ? job.class.throttle_count.call(job) : job.class.throttle_count
throttle_period = job.class.throttle_period.respond_to?(:call) ? job.class.throttle_period.call(job) : job.class.throttle_period
GoodJob::Execution.advisory_lock_key("throttle_#{throttle_key}", function: "pg_advisory_lock") do
allowed_active_job_ids = GoodJob::Job.where.not(error: GoodJobThrottleExceededError.to_s)
.or(GoodJob::Job.where(error: nil))
.where("performed_at > ?", throttle_period.ago)
.where(job_class: job.class.to_s)
.where(concurrency_key: throttle_key)
.order(performed_at: :asc)
.limit(throttle_count)
.pluck(:active_job_id)
job_allowed = allowed_active_job_ids.include?(job.job_id)
within_threshold = allowed_active_job_ids.count < throttle_count
raise GoodJobThrottleExceededError unless job_allowed || within_threshold
end
end
end
class_methods do
def good_job_throttle_with(count: 1, period: 1.minute, key: ->(job) { job.class.name })
# binding.debugger
self.throttle_enabled = true
self.throttle_count = count
self.throttle_period = period
self.throttle_key = key
end
end
end
# frozen_string_literal: true
require "rails_helper"
RSpec.describe GoodJobThrottleExtension do
before do
ActiveJob::Base.disable_test_adapter
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)
stub_const "JOB_PERFORMED", Concurrent::AtomicBoolean.new(false)
stub_const "TestJob", (Class.new(ActiveJob::Base) do
include GoodJobThrottleExtension
def perform(name:)
name && sleep(1)
JOB_PERFORMED.make_true
end
end)
end
describe "when extension is only included but not configured" do
it "does not throttle" do
expect do
TestJob.perform_later(name: "Alice")
GoodJob.perform_inline
end.not_to raise_error
end
end
describe "when throttle key returns nil" do
it "does not throttle" do
TestJob.good_job_throttle_with(
count: 1,
period: 1.minute,
key: ->(job) {}
)
expect(TestJob.perform_later(name: "Alice")).to be_present
expect(TestJob.perform_later(name: "Alice")).to be_present
end
end
describe "when throttle key is nil" do
it "does not throttle" do
TestJob.good_job_throttle_with(
count: 1,
period: 1.minute,
key: nil
)
expect(TestJob.perform_later(name: "Alice")).to be_present
expect(TestJob.perform_later(name: "Alice")).to be_present
end
end
describe ".good_job_throttle_with" do
describe "count:", :skip_rails_5 do
before do
TestJob.good_job_throttle_with(
count: 1,
period: 1.minute,
key: ->(job) { job.arguments.first[:name] }
)
end
it "does not enqueue if limit is exceeded for a particular key" do
expect(TestJob.new.perform(name: "Alice")).to be_present
expect(TestJob.new.perform(name: "Alice")).to be false
end
end
end
describe "#good_job_throttle_key" do
context "when retrying a job" do
before do
stub_const "TestJob", (Class.new(ActiveJob::Base) do
include GoodJobThrottleExtension
good_job_throttle_with(
count: 1,
period: 1.minute,
key: ->(job) { Time.current.to_f }
)
retry_on StandardError
def perform(*)
raise "ERROR"
end
end)
end
describe "retries" do
it "preserves the value" do
TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
begin
GoodJob.perform_inline
rescue
nil
end
expect(GoodJob::Execution.count).to eq 1
expect(GoodJob::Execution.first.concurrency_key).to be_present
expect(GoodJob::Job.first).not_to be_finished
end
context "when not discrete" do
it "preserves the key value across retries" do
TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice")
GoodJob::Job.first.update!(is_discrete: false)
begin
GoodJob.perform_inline
rescue
nil
end
expect(GoodJob::Execution.count).to eq 2
first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a
expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key
end
end
end
end
context "when no key is specified" do
before do
stub_const "TestJob", (Class.new(ActiveJob::Base) do
include GoodJobThrottleExtension
def perform(name)
end
end)
end
it "uses the class name as the default throttle key" do
job = TestJob.perform_later("Alice")
expect(job.good_job_concurrency_key).to eq("TestJob")
end
end
describe "#perform_later" do
before do
stub_const "TestJob", (Class.new(ActiveJob::Base) do
include GoodJobThrottleExtension
good_job_throttle_with(
count: 1,
period: 1.minute,
key: ->(job) { job.arguments.first }
)
def perform(arg)
end
end)
end
# it "raises an error for non-serializable types" do
# expect { TestJob.new.perform({key: "value"}) }.to raise_error(TypeError, "Throttle key must be a String; was a Hash")
# expect { TestJob.new.perform({key: "value"}.with_indifferent_access) }.to raise_error(TypeError)
# expect { TestJob.new.perform(["key"]) }.to raise_error(TypeError)
# expect { TestJob.new.perform(TestJob) }.to raise_error(TypeError)
# end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment