Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save nalabjp/7c59f49779e958f703693dc41ac75f35 to your computer and use it in GitHub Desktop.
Save nalabjp/7c59f49779e958f703693dc41ac75f35 to your computer and use it in GitHub Desktop.
diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md
index c5d7bcd34b..6a26eb8487 100644
--- a/activejob/CHANGELOG.md
+++ b/activejob/CHANGELOG.md
@@ -1,266 +1,273 @@
-## Rails 6.1.7.7 (February 21, 2024) ##
+## Rails 7.1.3.2 (February 21, 2024) ##
* No changes.
-## Rails 6.1.7.6 (August 22, 2023) ##
+## Rails 7.1.3.1 (February 21, 2024) ##
* No changes.
-## Rails 6.1.7.5 (August 22, 2023) ##
+## Rails 7.1.3 (January 16, 2024) ##
-* No changes.
+* Do not trigger immediate loading of `ActiveJob::Base` when loading `ActiveJob::TestHelper`.
+ *Maxime Réty*
-## Rails 6.1.7.4 (June 26, 2023) ##
+* Preserve the serialized timezone when deserializing `ActiveSupport::TimeWithZone` arguments.
-* No changes.
+ *Joshua Young*
+* Fix ActiveJob arguments serialization to correctly serialize String subclasses having custom serializers.
-## Rails 6.1.7.3 (March 13, 2023) ##
+ *fatkodima*
-* No changes.
-
-## Rails 6.1.7.2 (January 24, 2023) ##
+## Rails 7.1.2 (November 10, 2023) ##
* No changes.
-## Rails 6.1.7.1 (January 17, 2023) ##
+## Rails 7.1.1 (October 11, 2023) ##
-* No changes.
+* Don't log enqueuing details when the job wasn't enqueued.
+ *Dustin Brown*
-## Rails 6.1.7 (September 09, 2022) ##
-
-* No changes.
-
-## Rails 6.1.6.1 (July 12, 2022) ##
+## Rails 7.1.0 (October 05, 2023) ##
* No changes.
-## Rails 6.1.6 (May 09, 2022) ##
+## Rails 7.1.0.rc2 (October 01, 2023) ##
-* No changes.
+* Make sure `scheduled_at` is a Time object when asserting enqueued jobs.
+ *Rafael Mendonça França*
-## Rails 6.1.5.1 (April 26, 2022) ##
-* No changes.
+## Rails 7.1.0.rc1 (September 27, 2023) ##
+* Set `scheduled_at` attribute as a Time object instead of epoch seconds, and serialize and deserialize the value
+ when enqueued. Assigning a numeric/epoch value to scheduled_at= is deprecated; use a Time object instead.
-## Rails 6.1.5 (March 09, 2022) ##
+ Deserializes `enqueued_at` as a Time instead of ISO8601 String.
-* No changes.
+ *Ben Sheldon*
+* Clarify the backoff strategy for the recommended `:wait` option when retrying jobs
-## Rails 6.1.4.7 (March 08, 2022) ##
+ `wait: :exponentially_longer` is waiting polynomially longer, so it is now recommended to use `wait: :polynomially_longer` to keep the same behavior.
-* No changes.
+ *Victor Mours*
-## Rails 6.1.4.6 (February 11, 2022) ##
+## Rails 7.1.0.beta1 (September 13, 2023) ##
-* No changes.
+* Fix Active Job log message to correctly report a job failed to enqueue
+ when the adapter raises an `ActiveJob::EnqueueError`.
+ *Ben Sheldon*
-## Rails 6.1.4.5 (February 11, 2022) ##
+* Add `after_discard` method.
-* No changes.
+ This method lets job authors define a block which will be run when a job is about to be discarded. For example:
+ ```ruby
+ class AfterDiscardJob < ActiveJob::Base
+ after_discard do |job, exception|
+ Rails.logger.info("#{job.class} raised an exception: #{exception}")
+ end
-## Rails 6.1.4.4 (December 15, 2021) ##
+ def perform
+ raise StandardError
+ end
+ end
+ ```
-* No changes.
+ The above job will run the block passed to `after_discard` after the job is discarded. The exception will
+ still be raised after the block has been run.
+ *Rob Cardy*
-## Rails 6.1.4.3 (December 14, 2021) ##
+* Fix deserialization of ActiveSupport::Duration
-* No changes.
+ Previously, a deserialized Duration would return an array from Duration#parts.
+ It will now return a hash just like a regular Duration.
+ This also fixes an error when trying to add or subtract from a deserialized Duration
+ (eg `duration + 1.year`).
-## Rails 6.1.4.2 (December 14, 2021) ##
+ *Jonathan del Strother*
-* No changes.
+* `perform_enqueued_jobs` is now compatible with all Active Job adapters
+ This means that methods that depend on it, like Action Mailer's `assert_emails`,
+ will work correctly even if the test adapter is not used.
-## Rails 6.1.4.1 (August 19, 2021) ##
+ *Alex Ghiculescu*
-* No changes.
+* Allow queue adapters to provide a custom name by implementing `queue_adapter_name`
+ *Sander Verdonschot*
-## Rails 6.1.4 (June 24, 2021) ##
+* Log background job enqueue callers
-* No changes.
+ Add `verbose_enqueue_logs` configuration option to display the caller
+ of background job enqueue in the log to help with debugging.
+ Example log line:
-## Rails 6.1.3.2 (May 05, 2021) ##
+ ```
+ Enqueued AvatarThumbnailsJob (Job ID: ab528951-41fb-4c48-9129-3171791c27d6) to Sidekiq(default) with arguments: 1092412064
+ ↳ app/models/user.rb:421:in `generate_avatar_thumbnails'
+ ```
-* No changes.
+ Enabled in development only for new and upgraded applications. Not recommended for use
+ in the production environment since it relies on Ruby's `Kernel#caller` which is fairly slow.
+ *fatkodima*
-## Rails 6.1.3.1 (March 26, 2021) ##
+* Set `provider_job_id` for Backburner jobs
-* No changes.
+ *Cameron Matheson*
+* Add `perform_all_later` to enqueue multiple jobs at once
-## Rails 6.1.3 (February 17, 2021) ##
+ This adds the ability to bulk enqueue jobs, without running callbacks, by
+ passing multiple jobs or an array of jobs. For example:
-* No changes.
+ ```ruby
+ ActiveJob.perform_all_later(MyJob.new("hello", 42), MyJob.new("world", 0))
+ user_jobs = User.pluck(:id).map { |id| UserJob.new(user_id: id) }
+ ActiveJob.perform_all_later(user_jobs)
+ ```
-## Rails 6.1.2.1 (February 10, 2021) ##
+ This can greatly reduce the number of round-trips to the queue datastore.
+ For queue adapters that do not implement the new `enqueue_all` method, we
+ fall back to enqueuing jobs individually. The Sidekiq adapter implements
+ `enqueue_all` with `push_bulk`.
-* No changes.
+ This method does not use the existing `enqueue.active_job` event, but adds a
+ new event `enqueue_all.active_job`.
+ *Sander Verdonschot*
-## Rails 6.1.2 (February 09, 2021) ##
+* Don't double log the `job` when using `ActiveRecord::QueryLog`
-* No changes.
+ Previously if you set `config.active_record.query_log_tags` to an array that included
+ `:job`, the job name would get logged twice. This bug has been fixed.
+ *Alex Ghiculescu*
-## Rails 6.1.1 (January 07, 2021) ##
+* Add support for Sidekiq's transaction-aware client
-* Make `retry_job` return the job that was created.
+ *Jonathan del Strother*
- *Rafael Mendonça França*
+* Remove QueAdapter from Active Job.
-* Include `ActiveSupport::Testing::Assertions` in `ActiveJob::TestHelpers`.
+ After maintaining Active Job QueAdapter by Rails and Que side
+ to support Ruby 3 keyword arguments and options provided as top level keywords,
+ it is quite difficult to maintain it this way.
- *Mikkel Malmberg*
+ Active Job Que adapter can be included in the future version of que gem itself.
+ *Yasuo Honda*
-## Rails 6.1.0 (December 09, 2020) ##
+* Fix BigDecimal (de)serialization for adapters using JSON.
-* Recover nano precision when serializing `Time`, `TimeWithZone` and `DateTime` objects.
+ Previously, BigDecimal was listed as not needing a serializer. However,
+ when used with an adapter storing the job arguments as JSON, it would get
+ serialized as a simple String, resulting in deserialization also producing
+ a String (instead of a BigDecimal).
- *Alan Tan*
+ By using a serializer, we ensure the round trip is safe.
-* Deprecate `config.active_job.return_false_on_aborted_enqueue`.
+ To ensure applications using BigDecimal job arguments are not subject to
+ race conditions during deployment (where a replica running a version of
+ Rails without BigDecimalSerializer fails to deserialize an argument
+ serialized with it), `ActiveJob.use_big_decimal_serializer` is disabled by
+ default, and can be set to true in a following deployment..
- *Rafael Mendonça França*
+ *Sam Bostock*
-* Return `false` when enqueuing a job is aborted.
+* Preserve full-precision `enqueued_at` timestamps for serialized jobs,
+ allowing more accurate reporting of how long a job spent waiting in the
+ queue before it was performed.
- *Rafael Mendonça França*
+ Retains IS08601 format compatibility.
-* While using `perform_enqueued_jobs` test helper enqueued jobs must be stored for the later check with
- `assert_enqueued_with`.
+ *Jeremy Daer*
- *Dmitry Polushkin*
+* Add `--parent` option to job generator to specify parent class of job.
-* `ActiveJob::TestCase#perform_enqueued_jobs` without a block removes performed jobs from the queue.
+ Example:
- That way the helper can be called multiple times and not perform a job invocation multiple times.
+ `bin/rails g job process_payment --parent=payment_job` generates:
```ruby
- def test_jobs
- HelloJob.perform_later("rafael")
- perform_enqueued_jobs # only runs with "rafael"
- HelloJob.perform_later("david")
- perform_enqueued_jobs # only runs with "david"
+ class ProcessPaymentJob < PaymentJob
+ # ...
end
```
- *Étienne Barrié*
-
-* `ActiveJob::TestCase#perform_enqueued_jobs` will no longer perform retries:
-
- When calling `perform_enqueued_jobs` without a block, the adapter will
- now perform jobs that are **already** in the queue. Jobs that will end up in
- the queue afterwards won't be performed.
-
- This change only affects `perform_enqueued_jobs` when no block is given.
-
- *Edouard Chin*
-
-* Add queue name support to Que adapter.
-
- *Brad Nauta*, *Wojciech Wnętrzak*
-
-* Don't run `after_enqueue` and `after_perform` callbacks if the callback chain is halted.
-
- class MyJob < ApplicationJob
- before_enqueue { throw(:abort) }
- after_enqueue { # won't enter here anymore }
- end
-
- `after_enqueue` and `after_perform` callbacks will no longer run if the callback chain is halted.
- This behaviour is a breaking change and won't take effect until Rails 7.0.
- To enable this behaviour in your app right now, you can add in your app's configuration file
- `config.active_job.skip_after_callbacks_if_terminated = true`.
-
- *Edouard Chin*
-
-* Fix enqueuing and performing incorrect logging message.
-
- Jobs will no longer always log "Enqueued MyJob" or "Performed MyJob" when they actually didn't get enqueued/performed.
-
- ```ruby
- class MyJob < ApplicationJob
- before_enqueue { throw(:abort) }
- end
-
- MyJob.perform_later # Will no longer log "Enqueued MyJob" since job wasn't even enqueued through adapter.
- ```
-
- A new message will be logged in case a job couldn't be enqueued, either because the callback chain was halted or
- because an exception happened during enqueuing. (i.e. Redis is down when you try to enqueue your job)
-
- *Edouard Chin*
-
-* Add an option to disable logging of the job arguments when enqueuing and executing the job.
+ *Gannon McGibbon*
- class SensitiveJob < ApplicationJob
- self.log_arguments = false
+* Add more detailed description to job generator.
- def perform(my_sensitive_argument)
- end
- end
+ *Gannon McGibbon*
- When dealing with sensitive arguments as password and tokens it is now possible to configure the job
- to not put the sensitive argument in the logs.
+* `perform.active_job` notification payloads now include `:db_runtime`, which
+ is the total time (in ms) taken by database queries while performing a job.
+ This value can be used to better understand how a job's time is spent.
- *Rafael Mendonça França*
-
-* Changes in `queue_name_prefix` of a job no longer affects all other jobs.
+ *Jonathan Hefner*
- Fixes #37084.
+* Update `ActiveJob::QueueAdapters::QueAdapter` to remove deprecation warning.
- *Lucas Mansur*
+ Remove a deprecation warning introduced in que 1.2 to prepare for changes in
+ que 2.0 necessary for Ruby 3 compatibility.
-* Allow `Class` and `Module` instances to be serialized.
+ *Damir Zekic* and *Adis Hasovic*
- *Kevin Deisz*
+* Add missing `bigdecimal` require in `ActiveJob::Arguments`
-* Log potential matches in `assert_enqueued_with` and `assert_performed_with`.
+ Could cause `uninitialized constant ActiveJob::Arguments::BigDecimal (NameError)`
+ when loading Active Job in isolation.
- *Gareth du Plooy*
+ *Jean Boussier*
-* Add `at` argument to the `perform_enqueued_jobs` test helper.
+* Allow testing `discard_on/retry_on ActiveJob::DeserializationError`
- *John Crepezzi*, *Eileen Uchitelle*
+ Previously in `perform_enqueued_jobs`, `deserialize_arguments_if_needed`
+ was called before calling `perform_now`. When a record no longer exists
+ and is serialized using GlobalID this led to raising
+ an `ActiveJob::DeserializationError` before reaching `perform_now` call.
+ This behavior makes difficult testing the job `discard_on/retry_on` logic.
-* `assert_enqueued_with` and `assert_performed_with` can now test jobs with relative delay.
+ Now `deserialize_arguments_if_needed` call is postponed to when `perform_now`
+ is called.
- *Vlado Cingel*
+ Example:
-* Add jitter to `ActiveJob::Exceptions.retry_on`.
+ ```ruby
+ class UpdateUserJob < ActiveJob::Base
+ discard_on ActiveJob::DeserializationError
- `ActiveJob::Exceptions.retry_on` now uses a random amount of jitter in order to
- prevent the [thundering herd effect](https://en.wikipedia.org/wiki/Thundering_herd_problem). Defaults to
- 15% (represented as 0.15) but overridable via the `:jitter` option when using `retry_on`.
- Jitter is applied when an `Integer`, `ActiveSupport::Duration` or `:exponentially_longer`, is passed to the `wait` argument in `retry_on`.
+ def perform(user)
+ # ...
+ end
+ end
- ```ruby
- retry_on(MyError, wait: :exponentially_longer, jitter: 0.30)
+ # In the test
+ User.destroy_all
+ assert_nothing_raised do
+ perform_enqueued_jobs only: UpdateUserJob
+ end
```
- *Anthony Ross*
-
+ *Jacopo Beschi*
-Please check [6-0-stable](https://github.com/rails/rails/blob/6-0-stable/activejob/CHANGELOG.md) for previous changes.
+Please check [7-0-stable](https://github.com/rails/rails/blob/7-0-stable/activejob/CHANGELOG.md) for previous changes.
diff --git a/activejob/MIT-LICENSE b/activejob/MIT-LICENSE
index 8a03b1633b..7be9ac633f 100644
--- a/activejob/MIT-LICENSE
+++ b/activejob/MIT-LICENSE
@@ -1,4 +1,4 @@
-Copyright (c) 2014-2022 David Heinemeier Hansson
+Copyright (c) David Heinemeier Hansson
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
@@ -18,3 +18,4 @@ NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
diff --git a/activejob/README.md b/activejob/README.md
index 6bcf68d608..eda7cc7292 100644
--- a/activejob/README.md
+++ b/activejob/README.md
@@ -2,22 +2,22 @@
Active Job is a framework for declaring jobs and making them run on a variety
of queuing backends. These jobs can be everything from regularly scheduled
-clean-ups, to billing charges, to mailings. Anything that can be chopped up into
-small units of work and run in parallel, really.
+clean-ups, to billing charges, to mailings — anything that can be chopped up into
+small units of work and run in parallel.
It also serves as the backend for Action Mailer's #deliver_later functionality
that makes it easy to turn any mailing into a job for running later. That's
one of the most common jobs in a modern web application: sending emails outside
-of the request-response cycle, so the user doesn't have to wait on it.
+the request-response cycle, so the user doesn't have to wait on it.
-The main point is to ensure that all Rails apps will have a job infrastructure
+The main point is to ensure that all \Rails apps will have a job infrastructure
in place, even if it's in the form of an "immediate runner". We can then have
framework features and other gems build on top of that, without having to worry
about API differences between Delayed Job and Resque. Picking your queuing
backend becomes more of an operational concern, then. And you'll be able to
switch between them without having to rewrite your jobs.
-You can read more about Active Job in the [Active Job Basics](https://edgeguides.rubyonrails.org/active_job_basics.html) guide.
+You can read more about Active Job in the [Active Job Basics](https://guides.rubyonrails.org/active_job_basics.html) guide.
## Usage
@@ -104,7 +104,7 @@ The latest version of Active Job can be installed with RubyGems:
$ gem install activejob
```
-Source code can be downloaded as part of the Rails project on GitHub:
+Source code can be downloaded as part of the \Rails project on GitHub:
* https://github.com/rails/rails/tree/main/activejob
@@ -122,7 +122,7 @@ API documentation is at:
* https://api.rubyonrails.org
-Bug reports for the Ruby on Rails project can be filed here:
+Bug reports for the Ruby on \Rails project can be filed here:
* https://github.com/rails/rails/issues
diff --git a/activejob/Rakefile b/activejob/Rakefile
index 6a775a6992..0529c24a7d 100644
--- a/activejob/Rakefile
+++ b/activejob/Rakefile
@@ -2,7 +2,7 @@
require "rake/testtask"
-ACTIVEJOB_ADAPTERS = %w(async inline delayed_job que queue_classic resque sidekiq sneakers sucker_punch backburner test)
+ACTIVEJOB_ADAPTERS = %w(async inline delayed_job queue_classic resque sidekiq sneakers sucker_punch backburner test)
ACTIVEJOB_ADAPTERS.delete("queue_classic") if defined?(JRUBY_VERSION)
task default: :test
@@ -28,7 +28,6 @@ namespace :test do
task "env:integration" do
ENV["AJ_INTEGRATION_TESTS"] = "1"
- ENV["SKIP_REQUIRE_WEBPACKER"] = "true"
end
ACTIVEJOB_ADAPTERS.each do |adapter|
@@ -37,7 +36,10 @@ namespace :test do
Rake::TestTask.new(adapter => "test:env:#{adapter}") do |t|
t.description = "Run adapter tests for #{adapter}"
t.libs << "test"
- t.test_files = FileList["test/cases/**/*_test.rb"]
+ t.test_files = FileList["test/cases/**/*_test.rb"].reject { |x|
+ (x.include?("delayed_job") && adapter != "delayed_job") ||
+ (x.include?("async") && adapter != "async")
+ }
t.verbose = true
t.warning = true
t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
@@ -45,7 +47,10 @@ namespace :test do
namespace :isolated do
task adapter => "test:env:#{adapter}" do
- Dir.glob("#{__dir__}/test/cases/**/*_test.rb").all? do |file|
+ Dir.glob("#{__dir__}/test/cases/**/*_test.rb").reject { |x|
+ (x.include?("delayed_job") && adapter != "delayed_job") ||
+ (x.include?("async") && adapter != "async")
+ }.all? do |file|
sh(Gem.ruby, "-w", "-I#{__dir__}/lib", "-I#{__dir__}/test", file)
end || raise("Failures")
end
diff --git a/activejob/activejob.gemspec b/activejob/activejob.gemspec
index af671f4695..cf0be7e0ca 100644
--- a/activejob/activejob.gemspec
+++ b/activejob/activejob.gemspec
@@ -9,7 +9,7 @@
s.summary = "Job framework with pluggable queues."
s.description = "Declare job classes that can be run by a variety of queuing backends."
- s.required_ruby_version = ">= 2.5.0"
+ s.required_ruby_version = ">= 2.7.0"
s.license = "MIT"
diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb
index 0b89d70f46..091c234182 100644
--- a/activejob/lib/active_job.rb
+++ b/activejob/lib/active_job.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
#--
-# Copyright (c) 2014-2022 David Heinemeier Hansson
+# Copyright (c) David Heinemeier Hansson
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@@ -26,15 +26,38 @@
require "active_support"
require "active_support/rails"
require "active_job/version"
+require "active_job/deprecator"
require "global_id"
+# :markup: markdown
+# :include: activejob/README.md
module ActiveJob
extend ActiveSupport::Autoload
autoload :Base
autoload :QueueAdapters
- autoload :Serializers
- autoload :ConfiguredJob
+
+ eager_autoload do
+ autoload :Serializers
+ autoload :ConfiguredJob
+ end
+
autoload :TestCase
autoload :TestHelper
+
+ ##
+ # :singleton-method:
+ # If false, \Rails will preserve the legacy serialization of BigDecimal job arguments as Strings.
+ # If true, \Rails will use the new BigDecimalSerializer to (de)serialize BigDecimal losslessly.
+ # Legacy serialization will be removed in \Rails 7.2, along with this config.
+ singleton_class.attr_accessor :use_big_decimal_serializer
+ self.use_big_decimal_serializer = false
+
+ ##
+ # :singleton-method:
+ #
+ # Specifies if the methods calling background job enqueue should be logged below
+ # their relevant enqueue log lines. Defaults to false.
+ singleton_class.attr_accessor :verbose_enqueue_logs
+ self.verbose_enqueue_logs = false
end
diff --git a/activejob/lib/active_job/arguments.rb b/activejob/lib/active_job/arguments.rb
index 0e18bc55c6..4f144c5635 100644
--- a/activejob/lib/active_job/arguments.rb
+++ b/activejob/lib/active_job/arguments.rb
@@ -1,5 +1,6 @@
# frozen_string_literal: true
+require "bigdecimal"
require "active_support/core_ext/hash"
module ActiveJob
@@ -7,7 +8,7 @@ module ActiveJob
#
# Wraps the original exception raised as +cause+.
class DeserializationError < StandardError
- def initialize #:nodoc:
+ def initialize # :nodoc:
super("Error while trying to deserialize arguments: #{$!.message}")
set_backtrace $!.backtrace
end
@@ -17,8 +18,8 @@ def initialize #:nodoc:
# currently support String, Integer, Float, NilClass, TrueClass, FalseClass,
# BigDecimal, Symbol, Date, Time, DateTime, ActiveSupport::TimeWithZone,
# ActiveSupport::Duration, Hash, ActiveSupport::HashWithIndifferentAccess,
- # Array or GlobalID::Identification instances, although this can be extended
- # by adding custom serializers.
+ # Array, Range, or GlobalID::Identification instances, although this can be
+ # extended by adding custom serializers.
# Raised if you set the key for a Hash something else than a string or
# a symbol. Also raised when trying to serialize an object which can't be
# identified with a GlobalID - such as an unpersisted Active Record model.
@@ -45,8 +46,6 @@ def deserialize(arguments)
end
private
- # :nodoc:
- PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]
# :nodoc:
GLOBALID_KEY = "_aj_globalid"
# :nodoc:
@@ -66,40 +65,23 @@ def deserialize(arguments)
OBJECT_SERIALIZER_KEY, OBJECT_SERIALIZER_KEY.to_sym,
WITH_INDIFFERENT_ACCESS_KEY, WITH_INDIFFERENT_ACCESS_KEY.to_sym,
]
- private_constant :PERMITTED_TYPES, :RESERVED_KEYS, :GLOBALID_KEY,
+ private_constant :RESERVED_KEYS, :GLOBALID_KEY,
:SYMBOL_KEYS_KEY, :RUBY2_KEYWORDS_KEY, :WITH_INDIFFERENT_ACCESS_KEY
- unless Hash.respond_to?(:ruby2_keywords_hash?) && Hash.respond_to?(:ruby2_keywords_hash)
- using Module.new {
- refine Hash do
- class << Hash
- if RUBY_VERSION >= "2.7"
- def ruby2_keywords_hash?(hash)
- !new(*[hash]).default.equal?(hash)
- end
- else
- def ruby2_keywords_hash?(hash)
- false
- end
- end
-
- def ruby2_keywords_hash(hash)
- _ruby2_keywords_hash(**hash)
- end
-
- private def _ruby2_keywords_hash(*args)
- args.last
- end
- ruby2_keywords(:_ruby2_keywords_hash) if respond_to?(:ruby2_keywords, true)
- end
- end
- }
- end
-
def serialize_argument(argument)
case argument
- when *PERMITTED_TYPES
+ when nil, true, false, Integer, Float # Types that can hardly be subclassed
argument
+ when String
+ if argument.class == String
+ argument
+ else
+ begin
+ Serializers.serialize(argument)
+ rescue SerializationError
+ argument
+ end
+ end
when GlobalID::Identification
convert_to_global_id_hash(argument)
when Array
@@ -116,18 +98,29 @@ def serialize_argument(argument)
result = serialize_hash(argument)
result[aj_hash_key] = symbol_keys
result
- when -> (arg) { arg.respond_to?(:permitted?) }
- serialize_indifferent_hash(argument.to_h)
else
- Serializers.serialize(argument)
+ if argument.respond_to?(:permitted?) && argument.respond_to?(:to_h)
+ serialize_indifferent_hash(argument.to_h)
+ elsif BigDecimal === argument && !ActiveJob.use_big_decimal_serializer
+ ActiveJob.deprecator.warn(<<~MSG)
+ Primitive serialization of BigDecimal job arguments is deprecated as it may serialize via .to_s using certain queue adapters.
+ Enable config.active_job.use_big_decimal_serializer to use BigDecimalSerializer instead, which will be mandatory in Rails 7.2.
+
+ Note that if your application has multiple replicas, you should only enable this setting after successfully deploying your app to Rails 7.1 first.
+ This will ensure that during your deployment all replicas are capable of deserializing arguments serialized with BigDecimalSerializer.
+ MSG
+ argument
+ else
+ Serializers.serialize(argument)
+ end
end
end
def deserialize_argument(argument)
case argument
- when String
+ when nil, true, false, String, Integer, Float
argument
- when *PERMITTED_TYPES
+ when BigDecimal # BigDecimal may have been legacy serialized; Remove in 7.2
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
diff --git a/activejob/lib/active_job/base.rb b/activejob/lib/active_job/base.rb
index da3cd89f2c..0fdef4354f 100644
--- a/activejob/lib/active_job/base.rb
+++ b/activejob/lib/active_job/base.rb
@@ -14,8 +14,8 @@
require "active_job/timezones"
require "active_job/translation"
-module ActiveJob #:nodoc:
- # = Active Job
+module ActiveJob # :nodoc:
+ # = Active Job \Base
#
# Active Job objects can be configured to work with different backend
# queuing frameworks. To specify a queue adapter to use:
@@ -69,8 +69,8 @@ class Base
include Execution
include Callbacks
include Exceptions
- include Logging
include Instrumentation
+ include Logging
include Timezones
include Translation
diff --git a/activejob/lib/active_job/callbacks.rb b/activejob/lib/active_job/callbacks.rb
index a5f82bd470..78f90a338e 100644
--- a/activejob/lib/active_job/callbacks.rb
+++ b/activejob/lib/active_job/callbacks.rb
@@ -1,11 +1,10 @@
# frozen_string_literal: true
require "active_support/callbacks"
-require "active_support/core_ext/object/with_options"
require "active_support/core_ext/module/attribute_accessors"
module ActiveJob
- # = Active Job Callbacks
+ # = Active Job \Callbacks
#
# Active Job provides hooks during the life cycle of a job. Callbacks allow you
# to trigger logic during this cycle. Available callbacks are:
@@ -29,25 +28,13 @@ class << self
end
included do
- class_attribute :return_false_on_aborted_enqueue, instance_accessor: false, instance_predicate: false, default: false
- singleton_class.deprecate :return_false_on_aborted_enqueue, :return_false_on_aborted_enqueue=
- cattr_accessor :skip_after_callbacks_if_terminated, instance_accessor: false, default: false
-
- with_options(skip_after_callbacks_if_terminated: skip_after_callbacks_if_terminated) do
- define_callbacks :perform
- define_callbacks :enqueue
- end
+ define_callbacks :perform, skip_after_callbacks_if_terminated: true
+ define_callbacks :enqueue, skip_after_callbacks_if_terminated: true
end
# These methods will be included into any Active Job object, adding
# callbacks for +perform+ and +enqueue+ methods.
module ClassMethods
- def inherited(klass)
- klass.get_callbacks(:enqueue).config[:skip_after_callbacks_if_terminated] = skip_after_callbacks_if_terminated
- klass.get_callbacks(:perform).config[:skip_after_callbacks_if_terminated] = skip_after_callbacks_if_terminated
- super
- end
-
# Defines a callback that will get called right before the
# job's perform method is executed.
#
@@ -145,7 +132,8 @@ def before_enqueue(*filters, &blk)
# queue_as :default
#
# after_enqueue do |job|
- # $statsd.increment "enqueue-video-job.success"
+ # result = job.successfully_enqueued? ? "success" : "failure"
+ # $statsd.increment "enqueue-video-job.#{result}"
# end
#
# def perform(video_id)
@@ -178,21 +166,5 @@ def around_enqueue(*filters, &blk)
set_callback(:enqueue, :around, *filters, &blk)
end
end
-
- private
- def halted_callback_hook(_filter, name) # :nodoc:
- return super unless %i(enqueue perform).include?(name.to_sym)
- callbacks = public_send("_#{name}_callbacks")
-
- if !self.class.skip_after_callbacks_if_terminated && callbacks.any? { |c| c.kind == :after }
- ActiveSupport::Deprecation.warn(<<~EOM)
- In Rails 7.0, `after_enqueue`/`after_perform` callbacks no longer run if `before_enqueue`/`before_perform` respectively halts with `throw :abort`.
- To enable this behavior, uncomment the `config.active_job.skip_after_callbacks_if_terminated` config
- in the new 6.1 framework defaults initializer.
- EOM
- end
-
- super
- end
end
end
diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb
index 311837247c..7d8aafe3a1 100644
--- a/activejob/lib/active_job/configured_job.rb
+++ b/activejob/lib/active_job/configured_job.rb
@@ -1,20 +1,22 @@
# frozen_string_literal: true
module ActiveJob
- class ConfiguredJob #:nodoc:
+ class ConfiguredJob # :nodoc:
def initialize(job_class, options = {})
@options = options
@job_class = job_class
end
- def perform_now(*args)
- @job_class.new(*args).perform_now
+ def perform_now(...)
+ @job_class.new(...).set(@options).perform_now
end
- ruby2_keywords(:perform_now) if respond_to?(:ruby2_keywords, true)
- def perform_later(*args)
- @job_class.new(*args).enqueue @options
+ def perform_later(...)
+ @job_class.new(...).enqueue @options
+ end
+
+ def perform_all_later(multi_args)
+ @job_class.perform_all_later(multi_args, options: @options)
end
- ruby2_keywords(:perform_later) if respond_to?(:ruby2_keywords, true)
end
end
diff --git a/activejob/lib/active_job/core.rb b/activejob/lib/active_job/core.rb
index e83d2b2679..3a6c4cd280 100644
--- a/activejob/lib/active_job/core.rb
+++ b/activejob/lib/active_job/core.rb
@@ -1,6 +1,8 @@
# frozen_string_literal: true
module ActiveJob
+ # = Active Job \Core
+ #
# Provides general behavior that will be included into every Active Job
# object that inherits from ActiveJob::Base.
module Core
@@ -10,8 +12,10 @@ module Core
attr_accessor :arguments
attr_writer :serialized_arguments
- # Timestamp when the job should be performed
- attr_accessor :scheduled_at
+ # Time when the job should be performed
+ attr_reader :scheduled_at
+
+ attr_reader :_scheduled_at_time # :nodoc:
# Job Identifier
attr_accessor :job_id
@@ -43,6 +47,16 @@ module Core
# Track when a job was enqueued
attr_accessor :enqueued_at
+ # Track whether the adapter received the job successfully.
+ attr_writer :successfully_enqueued # :nodoc:
+
+ def successfully_enqueued?
+ @successfully_enqueued
+ end
+
+ # Track any exceptions raised by the backend so callers can inspect the errors.
+ attr_accessor :enqueue_error
+
# These methods will be included into any Active Job object, adding
# helpers for de/serialization and creation of job instances.
module ClassMethods
@@ -82,12 +96,14 @@ def initialize(*arguments)
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
+ @scheduled_at = nil
+ @_scheduled_at_time = nil
@priority = self.class.priority
@executions = 0
@exception_executions = {}
@timezone = Time.zone&.name
end
- ruby2_keywords(:initialize) if respond_to?(:ruby2_keywords, true)
+ ruby2_keywords(:initialize)
# Returns a hash with the job data that can safely be passed to the
# queuing adapter.
@@ -103,7 +119,8 @@ def serialize
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => timezone,
- "enqueued_at" => Time.now.utc.iso8601
+ "enqueued_at" => Time.now.utc.iso8601(9),
+ "scheduled_at" => _scheduled_at_time ? _scheduled_at_time.utc.iso8601(9) : nil,
}
end
@@ -143,7 +160,30 @@ def deserialize(job_data)
self.exception_executions = job_data["exception_executions"]
self.locale = job_data["locale"] || I18n.locale.to_s
self.timezone = job_data["timezone"] || Time.zone&.name
- self.enqueued_at = job_data["enqueued_at"]
+ self.enqueued_at = Time.iso8601(job_data["enqueued_at"]) if job_data["enqueued_at"]
+ self.scheduled_at = Time.iso8601(job_data["scheduled_at"]) if job_data["scheduled_at"]
+ end
+
+ # Configures the job with the given options.
+ def set(options = {}) # :nodoc:
+ self.scheduled_at = options[:wait].seconds.from_now if options[:wait]
+ self.scheduled_at = options[:wait_until] if options[:wait_until]
+ self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
+ self.priority = options[:priority].to_i if options[:priority]
+
+ self
+ end
+
+ def scheduled_at=(value)
+ @_scheduled_at_time = if value&.is_a?(Numeric)
+ ActiveJob.deprecator.warn(<<~MSG.squish)
+ Assigning a numeric/epoch value to scheduled_at is deprecated. Use a Time object instead.
+ MSG
+ Time.at(value)
+ else
+ value
+ end
+ @scheduled_at = value
end
private
diff --git a/activejob/lib/active_job/deprecator.rb b/activejob/lib/active_job/deprecator.rb
new file mode 100644
index 0000000000..81975a41b6
--- /dev/null
+++ b/activejob/lib/active_job/deprecator.rb
@@ -0,0 +1,7 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ def self.deprecator # :nodoc:
+ @deprecator ||= ActiveSupport::Deprecation.new
+ end
+end
diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb
index e2077613e3..a805a53f62 100644
--- a/activejob/lib/active_job/enqueuing.rb
+++ b/activejob/lib/active_job/enqueuing.rb
@@ -4,6 +4,41 @@
module ActiveJob
# Provides behavior for enqueuing jobs.
+
+ # Can be raised by adapters if they wish to communicate to the caller a reason
+ # why the adapter was unexpectedly unable to enqueue a job.
+ class EnqueueError < StandardError; end
+
+ class << self
+ # Push many jobs onto the queue at once without running enqueue callbacks.
+ # Queue adapters may communicate the enqueue status of each job by setting
+ # successfully_enqueued and/or enqueue_error on the passed-in job instances.
+ def perform_all_later(*jobs)
+ jobs.flatten!
+ jobs.group_by(&:queue_adapter).each do |queue_adapter, adapter_jobs|
+ instrument_enqueue_all(queue_adapter, adapter_jobs) do
+ if queue_adapter.respond_to?(:enqueue_all)
+ queue_adapter.enqueue_all(adapter_jobs)
+ else
+ adapter_jobs.each do |job|
+ job.successfully_enqueued = false
+ if job.scheduled_at
+ queue_adapter.enqueue_at(job, job._scheduled_at_time.to_f)
+ else
+ queue_adapter.enqueue(job)
+ end
+ job.successfully_enqueued = true
+ rescue EnqueueError => e
+ job.enqueue_error = e
+ end
+ adapter_jobs.count(&:successfully_enqueued?)
+ end
+ end
+ end
+ nil
+ end
+ end
+
module Enqueuing
extend ActiveSupport::Concern
@@ -12,22 +47,28 @@ module ClassMethods
# Push a job onto the queue. By default the arguments must be either String,
# Integer, Float, NilClass, TrueClass, FalseClass, BigDecimal, Symbol, Date,
# Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration,
- # Hash, ActiveSupport::HashWithIndifferentAccess, Array or
+ # Hash, ActiveSupport::HashWithIndifferentAccess, Array, Range, or
# GlobalID::Identification instances, although this can be extended by adding
# custom serializers.
#
# Returns an instance of the job class queued with arguments available in
- # Job#arguments.
- def perform_later(*args)
- job_or_instantiate(*args).enqueue
+ # Job#arguments or false if the enqueue did not succeed.
+ #
+ # After the attempted enqueue, the job will be yielded to an optional block.
+ def perform_later(...)
+ job = job_or_instantiate(...)
+ enqueue_result = job.enqueue
+
+ yield job if block_given?
+
+ enqueue_result
end
- ruby2_keywords(:perform_later) if respond_to?(:ruby2_keywords, true)
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
- ruby2_keywords(:job_or_instantiate) if respond_to?(:ruby2_keywords, true)
+ ruby2_keywords(:job_or_instantiate)
end
# Enqueues the job to be performed by the queue adapter.
@@ -46,23 +87,22 @@ def job_or_instantiate(*args) # :doc:
# my_job_instance.enqueue wait_until: Date.tomorrow.midnight
# my_job_instance.enqueue priority: 10
def enqueue(options = {})
- self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
- self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
- self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
- self.priority = options[:priority].to_i if options[:priority]
- successfully_enqueued = false
+ set(options)
+ self.successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
- queue_adapter.enqueue_at self, scheduled_at
+ queue_adapter.enqueue_at self, _scheduled_at_time.to_f
else
queue_adapter.enqueue self
end
- successfully_enqueued = true
+ self.successfully_enqueued = true
+ rescue EnqueueError => e
+ self.enqueue_error = e
end
- if successfully_enqueued
+ if successfully_enqueued?
self
else
false
diff --git a/activejob/lib/active_job/exceptions.rb b/activejob/lib/active_job/exceptions.rb
index 9ff305dedc..0f4a1eee47 100644
--- a/activejob/lib/active_job/exceptions.rb
+++ b/activejob/lib/active_job/exceptions.rb
@@ -9,6 +9,7 @@ module Exceptions
included do
class_attribute :retry_jitter, instance_accessor: false, instance_predicate: false, default: 0.0
+ class_attribute :after_discard_procs, default: []
end
module ClassMethods
@@ -20,12 +21,16 @@ module ClassMethods
# You can also pass a block that'll be invoked if the retry attempts fail for custom logic rather than letting
# the exception bubble up. This block is yielded with the job instance as the first and the error instance as the second parameter.
#
+ # `retry_on` and `discard_on` handlers are searched from bottom to top, and up the class hierarchy. The handler of the first class for
+ # which <tt>exception.is_a?(klass)</tt> holds true is the one invoked, if any.
+ #
# ==== Options
# * <tt>:wait</tt> - Re-enqueues the job with a delay specified either in seconds (default: 3 seconds),
# as a computing proc that takes the number of executions so far as an argument, or as a symbol reference of
- # <tt>:exponentially_longer</tt>, which applies the wait algorithm of <tt>((executions**4) + (Kernel.rand * (executions**4) * jitter)) + 2</tt>
+ # <tt>:polynomially_longer</tt>, which applies the wait algorithm of <tt>((executions**4) + (Kernel.rand * (executions**4) * jitter)) + 2</tt>
# (first wait ~3s, then ~18s, then ~83s, etc)
- # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts)
+ # * <tt>:attempts</tt> - Re-enqueues the job the specified number of times (default: 5 attempts) or a symbol reference of <tt>:unlimited</tt>
+ # to retry the job until it succeeds
# * <tt>:queue</tt> - Re-enqueues the job on a different queue
# * <tt>:priority</tt> - Re-enqueues the job with a different priority
# * <tt>:jitter</tt> - A random delay of wait time used when calculating backoff. The default is 15% (0.15) which represents the upper bound of possible wait time (expressed as a percentage)
@@ -35,13 +40,14 @@ module ClassMethods
# class RemoteServiceJob < ActiveJob::Base
# retry_on CustomAppException # defaults to ~3s wait, 5 attempts
# retry_on AnotherCustomAppException, wait: ->(executions) { executions * 2 }
+ # retry_on CustomInfrastructureException, wait: 5.minutes, attempts: :unlimited
#
# retry_on ActiveRecord::Deadlocked, wait: 5.seconds, attempts: 3
- # retry_on Net::OpenTimeout, Timeout::Error, wait: :exponentially_longer, attempts: 10 # retries at most 10 times for Net::OpenTimeout and Timeout::Error combined
+ # retry_on Net::OpenTimeout, Timeout::Error, wait: :polynomially_longer, attempts: 10 # retries at most 10 times for Net::OpenTimeout and Timeout::Error combined
# # To retry at most 10 times for each individual exception:
- # # retry_on Net::OpenTimeout, wait: :exponentially_longer, attempts: 10
+ # # retry_on Net::OpenTimeout, wait: :polynomially_longer, attempts: 10
# # retry_on Net::ReadTimeout, wait: 5.seconds, jitter: 0.30, attempts: 10
- # # retry_on Timeout::Error, wait: :exponentially_longer, attempts: 10
+ # # retry_on Timeout::Error, wait: :polynomially_longer, attempts: 10
#
# retry_on(YetAnotherCustomAppException) do |job, error|
# ExceptionNotifier.caught(error)
@@ -54,17 +60,25 @@ module ClassMethods
# end
# end
def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: nil, jitter: JITTER_DEFAULT)
+ if wait == :exponentially_longer
+ ActiveJob.deprecator.warn(<<~MSG.squish)
+ `wait: :exponentially_longer` will actually wait polynomially longer and is therefore deprecated.
+ Prefer `wait: :polynomially_longer` to avoid confusion and keep the same behavior.
+ MSG
+ end
rescue_from(*exceptions) do |error|
executions = executions_for(exceptions)
- if executions < attempts
+ if attempts == :unlimited || executions < attempts
retry_job wait: determine_delay(seconds_or_duration_or_algorithm: wait, executions: executions, jitter: jitter), queue: queue, priority: priority, error: error
else
if block_given?
instrument :retry_stopped, error: error do
yield self, error
end
+ run_after_discard_procs(error)
else
instrument :retry_stopped, error: error
+ run_after_discard_procs(error)
raise error
end
end
@@ -76,6 +90,9 @@ def retry_on(*exceptions, wait: 3.seconds, attempts: 5, queue: nil, priority: ni
#
# You can also pass a block that'll be invoked. This block is yielded with the job instance as the first and the error instance as the second parameter.
#
+ # `retry_on` and `discard_on` handlers are searched from bottom to top, and up the class hierarchy. The handler of the first class for
+ # which <tt>exception.is_a?(klass)</tt> holds true is the one invoked, if any.
+ #
# ==== Example
#
# class SearchIndexingJob < ActiveJob::Base
@@ -93,14 +110,32 @@ def discard_on(*exceptions)
rescue_from(*exceptions) do |error|
instrument :discard, error: error do
yield self, error if block_given?
+ run_after_discard_procs(error)
end
end
end
+
+ # A block to run when a job is about to be discarded for any reason.
+ #
+ # ==== Example
+ #
+ # class WorkJob < ActiveJob::Base
+ # after_discard do |job, exception|
+ # ExceptionNotifier.report(exception)
+ # end
+ #
+ # ...
+ #
+ # end
+ def after_discard(&blk)
+ self.after_discard_procs += [blk]
+ end
end
- # Reschedules the job to be re-executed. This is useful in combination
- # with the +rescue_from+ option. When you rescue an exception from your job
- # you can ask Active Job to retry performing your job.
+ # Reschedules the job to be re-executed. This is useful in combination with
+ # {rescue_from}[rdoc-ref:ActiveSupport::Rescuable::ClassMethods#rescue_from].
+ # When you rescue an exception from your job you can ask Active Job to retry
+ # performing your job.
#
# ==== Options
# * <tt>:wait</tt> - Enqueues the job with the specified delay in seconds
@@ -133,7 +168,8 @@ def determine_delay(seconds_or_duration_or_algorithm:, executions:, jitter: JITT
jitter = jitter == JITTER_DEFAULT ? self.class.retry_jitter : (jitter || 0.0)
case seconds_or_duration_or_algorithm
- when :exponentially_longer
+ when :exponentially_longer, :polynomially_longer
+ # This delay uses a polynomial backoff strategy, which was previously misnamed as exponential
delay = executions**4
delay_jitter = determine_jitter_for_delay(delay, jitter)
delay + delay_jitter + 2
@@ -162,5 +198,15 @@ def executions_for(exceptions)
executions
end
end
+
+ def run_after_discard_procs(exception)
+ exceptions = []
+ after_discard_procs.each do |blk|
+ instance_exec(self, exception, &blk)
+ rescue StandardError => e
+ exceptions << e
+ end
+ raise exceptions.last unless exceptions.empty?
+ end
end
end
diff --git a/activejob/lib/active_job/execution.rb b/activejob/lib/active_job/execution.rb
index 5397b1dbb2..5a15607871 100644
--- a/activejob/lib/active_job/execution.rb
+++ b/activejob/lib/active_job/execution.rb
@@ -4,6 +4,12 @@
require "active_job/arguments"
module ActiveJob
+ # = Active Job \Execution
+ #
+ # Provides methods to execute jobs immediately, and wraps job execution so
+ # that exceptions configured with
+ # {rescue_from}[rdoc-ref:ActiveSupport::Rescuable::ClassMethods#rescue_from]
+ # are handled.
module Execution
extend ActiveSupport::Concern
include ActiveSupport::Rescuable
@@ -14,12 +20,11 @@ module ClassMethods
#
# MyJob.perform_now("mike")
#
- def perform_now(*args)
- job_or_instantiate(*args).perform_now
+ def perform_now(...)
+ job_or_instantiate(...).perform_now
end
- ruby2_keywords(:perform_now) if respond_to?(:ruby2_keywords, true)
- def execute(job_data) #:nodoc:
+ def execute(job_data) # :nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
@@ -44,15 +49,25 @@ def perform_now
deserialize_arguments_if_needed
- run_callbacks :perform do
- perform(*arguments)
- end
- rescue => exception
- rescue_with_handler(exception) || raise
+ _perform_job
+ rescue Exception => exception
+ handled = rescue_with_handler(exception)
+ return handled if handled
+
+ run_after_discard_procs(exception)
+ raise
end
def perform(*)
fail NotImplementedError
end
+
+ private
+ def _perform_job
+ ActiveSupport::ExecutionContext[:job] = self
+ run_callbacks :perform do
+ perform(*arguments)
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/gem_version.rb b/activejob/lib/active_job/gem_version.rb
index 344047da0d..05fe72cd80 100644
--- a/activejob/lib/active_job/gem_version.rb
+++ b/activejob/lib/active_job/gem_version.rb
@@ -1,16 +1,16 @@
# frozen_string_literal: true
module ActiveJob
- # Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
+ # Returns the currently loaded version of Active Job as a +Gem::Version+.
def self.gem_version
Gem::Version.new VERSION::STRING
end
module VERSION
- MAJOR = 6
+ MAJOR = 7
MINOR = 1
- TINY = 7
- PRE = "7"
+ TINY = 3
+ PRE = "2"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
diff --git a/activejob/lib/active_job/instrumentation.rb b/activejob/lib/active_job/instrumentation.rb
index c5f4e017e6..8e84f8f250 100644
--- a/activejob/lib/active_job/instrumentation.rb
+++ b/activejob/lib/active_job/instrumentation.rb
@@ -1,35 +1,47 @@
# frozen_string_literal: true
module ActiveJob
- module Instrumentation #:nodoc:
+ class << self
+ private
+ def instrument_enqueue_all(queue_adapter, jobs)
+ payload = { adapter: queue_adapter, jobs: jobs }
+ ActiveSupport::Notifications.instrument("enqueue_all.active_job", payload) do
+ result = yield payload
+ payload[:enqueued_count] = result
+ result
+ end
+ end
+ end
+
+ module Instrumentation # :nodoc:
extend ActiveSupport::Concern
included do
around_enqueue do |_, block|
scheduled_at ? instrument(:enqueue_at, &block) : instrument(:enqueue, &block)
end
+ end
- around_perform do |_, block|
- instrument :perform_start
- instrument :perform, &block
- end
+ def perform_now
+ instrument(:perform) { super }
end
private
- def instrument(operation, payload = {}, &block)
- enhanced_block = ->(event_payload) do
- value = block.call if block
+ def _perform_job
+ instrument(:perform_start)
+ super
+ end
- if defined?(@_halted_callback_hook_called) && @_halted_callback_hook_called
- event_payload[:aborted] = true
- @_halted_callback_hook_called = nil
- end
+ def instrument(operation, payload = {}, &block)
+ payload[:job] = self
+ payload[:adapter] = queue_adapter
+ ActiveSupport::Notifications.instrument("#{operation}.active_job", payload) do
+ value = block.call if block
+ payload[:aborted] = @_halted_callback_hook_called if defined?(@_halted_callback_hook_called)
+ @_halted_callback_hook_called = nil
value
end
-
- ActiveSupport::Notifications.instrument \
- "#{operation}.active_job", payload.merge(adapter: queue_adapter, job: self), &enhanced_block
end
def halted_callback_hook(*)
diff --git a/activejob/lib/active_job/log_subscriber.rb b/activejob/lib/active_job/log_subscriber.rb
index 6fc32ca3c9..6d7bc987b2 100644
--- a/activejob/lib/active_job/log_subscriber.rb
+++ b/activejob/lib/active_job/log_subscriber.rb
@@ -1,13 +1,14 @@
# frozen_string_literal: true
-require "active_support/core_ext/string/filters"
require "active_support/log_subscriber"
module ActiveJob
- class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
+ class LogSubscriber < ActiveSupport::LogSubscriber # :nodoc:
+ class_attribute :backtrace_cleaner, default: ActiveSupport::BacktraceCleaner.new
+
def enqueue(event)
job = event.payload[:job]
- ex = event.payload[:exception_object]
+ ex = event.payload[:exception_object] || job.enqueue_error
if ex
error do
@@ -23,10 +24,11 @@ def enqueue(event)
end
end
end
+ subscribe_log_level :enqueue, :info
def enqueue_at(event)
job = event.payload[:job]
- ex = event.payload[:exception_object]
+ ex = event.payload[:exception_object] || job.enqueue_error
if ex
error do
@@ -42,13 +44,44 @@ def enqueue_at(event)
end
end
end
+ subscribe_log_level :enqueue_at, :info
+
+ def enqueue_all(event)
+ info do
+ jobs = event.payload[:jobs]
+ adapter = event.payload[:adapter]
+ enqueued_count = event.payload[:enqueued_count]
+
+ if enqueued_count == jobs.size
+ enqueued_jobs_message(adapter, jobs)
+ elsif jobs.any?(&:successfully_enqueued?)
+ enqueued_jobs = jobs.select(&:successfully_enqueued?)
+
+ failed_enqueue_count = jobs.size - enqueued_count
+ if failed_enqueue_count == 0
+ enqueued_jobs_message(adapter, enqueued_jobs)
+ else
+ "#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\
+ "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}"
+ end
+ else
+ failed_enqueue_count = jobs.size - enqueued_count
+ "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} "\
+ "to #{ActiveJob.adapter_name(adapter)}"
+ end
+ end
+ end
+ subscribe_log_level :enqueue_all, :info
def perform_start(event)
info do
job = event.payload[:job]
- "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
+ enqueue_info = job.enqueued_at.present? ? " enqueued at #{job.enqueued_at.utc.iso8601(9)}" : ""
+
+ "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)}" + enqueue_info + args_info(job)
end
end
+ subscribe_log_level :perform_start, :info
def perform(event)
job = event.payload[:job]
@@ -67,6 +100,7 @@ def perform(event)
end
end
end
+ subscribe_log_level :perform, :info
def enqueue_retry(event)
job = event.payload[:job]
@@ -75,34 +109,37 @@ def enqueue_retry(event)
info do
if ex
- "Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
+ "Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds, due to a #{ex.class} (#{ex.message})."
else
- "Retrying #{job.class} in #{wait.to_i} seconds."
+ "Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds."
end
end
end
+ subscribe_log_level :enqueue_retry, :info
def retry_stopped(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
- "Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
+ "Stopped retrying #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message}), which reoccurred on #{job.executions} attempts."
end
end
+ subscribe_log_level :enqueue_retry, :error
def discard(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
- "Discarded #{job.class} due to a #{ex.class}."
+ "Discarded #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message})."
end
end
+ subscribe_log_level :discard, :error
private
def queue_name(event)
- event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
+ ActiveJob.adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})"
end
def args_info(job)
@@ -134,6 +171,41 @@ def scheduled_at(event)
def logger
ActiveJob::Base.logger
end
+
+ def info(progname = nil, &block)
+ return unless super
+
+ if ActiveJob.verbose_enqueue_logs
+ log_enqueue_source
+ end
+ end
+
+ def error(progname = nil, &block)
+ return unless super
+
+ if ActiveJob.verbose_enqueue_logs
+ log_enqueue_source
+ end
+ end
+
+ def log_enqueue_source
+ source = extract_enqueue_source_location(caller)
+
+ if source
+ logger.info("↳ #{source}")
+ end
+ end
+
+ def extract_enqueue_source_location(locations)
+ backtrace_cleaner.clean(locations.lazy).first
+ end
+
+ def enqueued_jobs_message(adapter, enqueued_jobs)
+ enqueued_count = enqueued_jobs.size
+ job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v }
+ "Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{ActiveJob.adapter_name(adapter)}"\
+ " (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})"
+ end
end
end
diff --git a/activejob/lib/active_job/logging.rb b/activejob/lib/active_job/logging.rb
index 876e7d6acb..65f89e863f 100644
--- a/activejob/lib/active_job/logging.rb
+++ b/activejob/lib/active_job/logging.rb
@@ -4,22 +4,25 @@
require "active_support/logger"
module ActiveJob
- module Logging #:nodoc:
+ module Logging # :nodoc:
extend ActiveSupport::Concern
included do
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
class_attribute :log_arguments, instance_accessor: false, default: true
- around_enqueue { |_, block| tag_logger(&block) }
- around_perform { |job, block| tag_logger(job.class.name, job.job_id, &block) }
+ around_enqueue(prepend: true) { |_, block| tag_logger(&block) }
+ end
+
+ def perform_now
+ tag_logger(self.class.name, self.job_id) { super }
end
private
- def tag_logger(*tags)
+ def tag_logger(*tags, &block)
if logger.respond_to?(:tagged)
tags.unshift "ActiveJob" unless logger_tagged_by_active_job?
- logger.tagged(*tags) { yield }
+ logger.tagged(*tags, &block)
else
yield
end
diff --git a/activejob/lib/active_job/queue_adapter.rb b/activejob/lib/active_job/queue_adapter.rb
index afe8515c83..3f2555deb4 100644
--- a/activejob/lib/active_job/queue_adapter.rb
+++ b/activejob/lib/active_job/queue_adapter.rb
@@ -3,9 +3,20 @@
require "active_support/core_ext/string/inflections"
module ActiveJob
- # The <tt>ActiveJob::QueueAdapter</tt> module is used to load the
+ class << self
+ def adapter_name(adapter) # :nodoc:
+ return adapter.queue_adapter_name if adapter.respond_to?(:queue_adapter_name)
+
+ adapter_class = adapter.is_a?(Module) ? adapter : adapter.class
+ "#{adapter_class.name.demodulize.delete_suffix('Adapter')}"
+ end
+ end
+
+ # = Active Job Queue adapter
+ #
+ # The +ActiveJob::QueueAdapter+ module is used to load the
# correct adapter. The default queue adapter is the +:async+ queue.
- module QueueAdapter #:nodoc:
+ module QueueAdapter # :nodoc:
extend ActiveSupport::Concern
included do
@@ -41,7 +52,7 @@ def queue_adapter=(name_or_adapter)
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
- adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
+ adapter_name = ActiveJob.adapter_name(name_or_adapter).underscore
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
diff --git a/activejob/lib/active_job/queue_adapters.rb b/activejob/lib/active_job/queue_adapters.rb
index 27759cf58f..c06e3e0a43 100644
--- a/activejob/lib/active_job/queue_adapters.rb
+++ b/activejob/lib/active_job/queue_adapters.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
module ActiveJob
- # == Active Job adapters
+ # = Active Job adapters
#
# Active Job has adapters for the following queuing backends:
#
@@ -13,10 +13,14 @@ module ActiveJob
# * {Sidekiq}[https://sidekiq.org]
# * {Sneakers}[https://github.com/jondot/sneakers]
# * {Sucker Punch}[https://github.com/brandonhilkert/sucker_punch]
- # * {Active Job Async Job}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]
- # * {Active Job Inline}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html]
# * Please Note: We are not accepting pull requests for new adapters. See the {README}[link:files/activejob/README_md.html] for more details.
#
+ # For testing and development Active Job has three built-in adapters:
+ #
+ # * {Active Job Async}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/AsyncAdapter.html]
+ # * {Active Job Inline}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/InlineAdapter.html]
+ # * {Active Job Test}[https://api.rubyonrails.org/classes/ActiveJob/QueueAdapters/TestAdapter.html]
+ #
# === Backends Features
#
# | | Async | Queues | Delayed | Priorities | Timeout | Retries |
@@ -31,6 +35,7 @@ module ActiveJob
# | Sucker Punch | Yes | Yes | Yes | No | No | No |
# | Active Job Async | Yes | Yes | Yes | No | No | No |
# | Active Job Inline | No | Yes | N/A | N/A | N/A | N/A |
+ # | Active Job Test | No | Yes | N/A | N/A | N/A | N/A |
#
# ==== Async
#
@@ -106,10 +111,6 @@ module ActiveJob
# N/A: The adapter does not run in a separate process, and therefore doesn't
# support retries.
#
- # === Async and Inline Queue Adapters
- #
- # Active Job has two built-in queue adapters intended for development and
- # testing: +:async+ and +:inline+.
module QueueAdapters
extend ActiveSupport::Autoload
diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb
index 53a7e3d53e..8f8e8f2503 100644
--- a/activejob/lib/active_job/queue_adapters/async_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb
@@ -7,7 +7,7 @@
module ActiveJob
module QueueAdapters
- # == Active Job Async adapter
+ # = Active Job Async adapter
#
# The Async adapter runs jobs with an in-process thread pool.
#
@@ -36,23 +36,23 @@ def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end
# Gracefully stop processing jobs. Finishes in-progress work and handles
# any new jobs following the executor's fallback policy (`caller_runs`).
# Waits for termination by default. Pass `wait: false` to continue.
- def shutdown(wait: true) #:nodoc:
+ def shutdown(wait: true) # :nodoc:
@scheduler.shutdown wait: wait
end
# Used for our test suite.
- def immediate=(immediate) #:nodoc:
+ def immediate=(immediate) # :nodoc:
@scheduler.immediate = immediate
end
@@ -60,7 +60,7 @@ def immediate=(immediate) #:nodoc:
# performing them in-process, but we do so anyway for parity with other
# adapters and deployment environments. Otherwise, serialization bugs
# may creep in undetected.
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
def initialize(job)
job.provider_job_id = SecureRandom.uuid
@job_data = job.serialize
@@ -71,7 +71,7 @@ def perform
end
end
- class Scheduler #:nodoc:
+ class Scheduler # :nodoc:
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 0,
max_threads: Concurrent.processor_count,
@@ -95,7 +95,7 @@ def enqueue(job, queue_name:)
def enqueue_at(job, timestamp, queue_name:)
delay = timestamp - Time.current.to_f
- if delay > 0
+ if !immediate && delay > 0
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
else
enqueue(job, queue_name: queue_name)
diff --git a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
index 7dc49310ac..8442a839ec 100644
--- a/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/backburner_adapter.rb
@@ -4,7 +4,7 @@
module ActiveJob
module QueueAdapters
- # == Backburner adapter for Active Job
+ # = Backburner adapter for Active Job
#
# Backburner is a beanstalkd-powered job queue that can handle a very
# high volume of jobs. You create background jobs and place them on
@@ -15,16 +15,20 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :backburner
class BackburnerAdapter
- def enqueue(job) #:nodoc:
- Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
+ def enqueue(job) # :nodoc:
+ response = Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority)
+ job.provider_job_id = response[:id] if response.is_a?(Hash)
+ response
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
delay = timestamp - Time.current.to_f
- Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority, delay: delay)
+ response = Backburner::Worker.enqueue(JobWrapper, [job.serialize], queue: job.queue_name, pri: job.priority, delay: delay)
+ job.provider_job_id = response[:id] if response.is_a?(Hash)
+ response
end
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
class << self
def perform(job_data)
Base.execute job_data
diff --git a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
index 8eeef32b99..85f5440019 100644
--- a/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/delayed_job_adapter.rb
@@ -1,10 +1,11 @@
# frozen_string_literal: true
require "delayed_job"
+require "active_support/core_ext/string/inflections"
module ActiveJob
module QueueAdapters
- # == Delayed Job adapter for Active Job
+ # = Delayed Job adapter for Active Job
#
# Delayed::Job (or DJ) encapsulates the common pattern of asynchronously
# executing longer tasks in the background. Although DJ can have many
@@ -15,19 +16,19 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :delayed_job
class DelayedJobAdapter
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority)
job.provider_job_id = delayed_job.id
delayed_job
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp))
job.provider_job_id = delayed_job.id
delayed_job
end
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
attr_accessor :job_data
def initialize(job_data)
@@ -35,12 +36,23 @@ def initialize(job_data)
end
def display_name
- "#{job_data['job_class']} [#{job_data['job_id']}] from DelayedJob(#{job_data['queue_name']}) with arguments: #{job_data['arguments']}"
+ base_name = "#{job_data["job_class"]} [#{job_data["job_id"]}] from DelayedJob(#{job_data["queue_name"]})"
+
+ return base_name unless log_arguments?
+
+ "#{base_name} with arguments: #{job_data["arguments"]}"
end
def perform
Base.execute(job_data)
end
+
+ private
+ def log_arguments?
+ job_data["job_class"].constantize.log_arguments?
+ rescue NameError
+ false
+ end
end
end
end
diff --git a/activejob/lib/active_job/queue_adapters/inline_adapter.rb b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
index ca04dc943c..3b8869d838 100644
--- a/activejob/lib/active_job/queue_adapters/inline_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/inline_adapter.rb
@@ -2,7 +2,7 @@
module ActiveJob
module QueueAdapters
- # == Active Job Inline adapter
+ # = Active Job Inline adapter
#
# When enqueuing jobs with the Inline adapter the job will be executed
# immediately.
@@ -11,11 +11,11 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :inline
class InlineAdapter
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
Base.execute(job.serialize)
end
- def enqueue_at(*) #:nodoc:
+ def enqueue_at(*) # :nodoc:
raise NotImplementedError, "Use a queueing backend to enqueue jobs in the future. Read more at https://guides.rubyonrails.org/active_job_basics.html"
end
end
diff --git a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
index ccc1881091..0f06164be0 100644
--- a/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/queue_classic_adapter.rb
@@ -4,7 +4,7 @@
module ActiveJob
module QueueAdapters
- # == queue_classic adapter for Active Job
+ # = queue_classic adapter for Active Job
#
# queue_classic provides a simple interface to a PostgreSQL-backed message
# queue. queue_classic specializes in concurrent locking and minimizing
@@ -19,13 +19,13 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :queue_classic
class QueueClassicAdapter
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
qc_job = build_queue(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.serialize)
job.provider_job_id = qc_job["id"] if qc_job.is_a?(Hash)
qc_job
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
queue = build_queue(job.queue_name)
unless queue.respond_to?(:enqueue_at)
raise NotImplementedError, "To be able to schedule jobs with queue_classic " \
@@ -37,16 +37,16 @@ def enqueue_at(job, timestamp) #:nodoc:
qc_job
end
- # Builds a <tt>QC::Queue</tt> object to schedule jobs on.
+ # Builds a +QC::Queue+ object to schedule jobs on.
#
- # If you have a custom <tt>QC::Queue</tt> subclass you'll need to subclass
- # <tt>ActiveJob::QueueAdapters::QueueClassicAdapter</tt> and override the
+ # If you have a custom +QC::Queue+ subclass you'll need to subclass
+ # +ActiveJob::QueueAdapters::QueueClassicAdapter+ and override the
# <tt>build_queue</tt> method.
def build_queue(queue_name)
QC::Queue.new(queue_name)
end
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
class << self
def perform(job_data)
Base.execute job_data
diff --git a/activejob/lib/active_job/queue_adapters/resque_adapter.rb b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
index 590b4ee98d..0fa330673b 100644
--- a/activejob/lib/active_job/queue_adapters/resque_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/resque_adapter.rb
@@ -16,7 +16,7 @@
module ActiveJob
module QueueAdapters
- # == Resque adapter for Active Job
+ # = Resque adapter for Active Job
#
# Resque (pronounced like "rescue") is a Redis-backed library for creating
# background jobs, placing those jobs on multiple queues, and processing
@@ -28,12 +28,12 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :resque
class ResqueAdapter
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
JobWrapper.instance_variable_set(:@queue, job.queue_name)
Resque.enqueue_to job.queue_name, JobWrapper, job.serialize
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
unless Resque.respond_to?(:enqueue_at_with_queue)
raise NotImplementedError, "To be able to schedule jobs with Resque you need the " \
"resque-scheduler gem. Please add it to your Gemfile and run bundle install"
@@ -41,7 +41,7 @@ def enqueue_at(job, timestamp) #:nodoc:
Resque.enqueue_at_with_queue job.queue_name, timestamp, JobWrapper, job.serialize
end
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
class << self
def perform(job_data)
Base.execute job_data
diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
index e4698f0ee8..55b578ae7c 100644
--- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb
@@ -1,14 +1,15 @@
# frozen_string_literal: true
+gem "sidekiq", ">= 4.1.0"
require "sidekiq"
module ActiveJob
module QueueAdapters
- # == Sidekiq adapter for Active Job
+ # = Sidekiq adapter for Active Job
#
# Simple, efficient background processing for Ruby. Sidekiq uses threads to
# handle many jobs at the same time in the same process. It does not
- # require Rails but will integrate tightly with it to make background
+ # require \Rails but will integrate tightly with it to make background
# processing dead simple.
#
# Read more about Sidekiq {here}[http://sidekiq.org].
@@ -17,25 +18,52 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :sidekiq
class SidekiqAdapter
- def enqueue(job) #:nodoc:
- # Sidekiq::Client does not support symbols as keys
- job.provider_job_id = Sidekiq::Client.push \
- "class" => JobWrapper,
- "wrapped" => job.class,
- "queue" => job.queue_name,
- "args" => [ job.serialize ]
+ def enqueue(job) # :nodoc:
+ job.provider_job_id = JobWrapper.set(
+ wrapped: job.class,
+ queue: job.queue_name
+ ).perform_async(job.serialize)
end
- def enqueue_at(job, timestamp) #:nodoc:
- job.provider_job_id = Sidekiq::Client.push \
- "class" => JobWrapper,
- "wrapped" => job.class,
- "queue" => job.queue_name,
- "args" => [ job.serialize ],
- "at" => timestamp
+ def enqueue_at(job, timestamp) # :nodoc:
+ job.provider_job_id = JobWrapper.set(
+ wrapped: job.class,
+ queue: job.queue_name,
+ ).perform_at(timestamp, job.serialize)
end
- class JobWrapper #:nodoc:
+ def enqueue_all(jobs) # :nodoc:
+ enqueued_count = 0
+ jobs.group_by(&:class).each do |job_class, same_class_jobs|
+ same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs|
+ immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? }
+
+ if immediate_jobs.any?
+ jids = Sidekiq::Client.push_bulk(
+ "class" => JobWrapper,
+ "wrapped" => job_class,
+ "queue" => queue,
+ "args" => immediate_jobs.map { |job| [job.serialize] },
+ )
+ enqueued_count += jids.compact.size
+ end
+
+ if scheduled_jobs.any?
+ jids = Sidekiq::Client.push_bulk(
+ "class" => JobWrapper,
+ "wrapped" => job_class,
+ "queue" => queue,
+ "args" => scheduled_jobs.map { |job| [job.serialize] },
+ "at" => scheduled_jobs.map { |job| job.scheduled_at&.to_f }
+ )
+ enqueued_count += jids.compact.size
+ end
+ end
+ end
+ enqueued_count
+ end
+
+ class JobWrapper # :nodoc:
include Sidekiq::Worker
def perform(job_data)
diff --git a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
index de98a950d0..1a895d0fd3 100644
--- a/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sneakers_adapter.rb
@@ -5,7 +5,7 @@
module ActiveJob
module QueueAdapters
- # == Sneakers adapter for Active Job
+ # = Sneakers adapter for Active Job
#
# A high-performance RabbitMQ background processing framework for Ruby.
# Sneakers is being used in production for both I/O and CPU intensive
@@ -22,18 +22,18 @@ def initialize
@monitor = Monitor.new
end
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
raise NotImplementedError, "This queueing backend does not support scheduling jobs. To see what features are supported go to http://api.rubyonrails.org/classes/ActiveJob/QueueAdapters.html"
end
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
include Sneakers::Worker
from_queue "default"
diff --git a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
index c520cbcafb..2ef1f202ed 100644
--- a/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb
@@ -4,13 +4,13 @@
module ActiveJob
module QueueAdapters
- # == Sucker Punch adapter for Active Job
+ # = Sucker Punch adapter for Active Job
#
# Sucker Punch is a single-process Ruby asynchronous processing library.
# This reduces the cost of hosting on a service like Heroku along
# with the memory footprint of having to maintain additional jobs if
# hosting on a dedicated server. All queues can run within a
- # single application (e.g. Rails, Sinatra, etc.) process.
+ # single application (e.g. \Rails, Sinatra, etc.) process.
#
# Read more about Sucker Punch {here}[https://github.com/brandonhilkert/sucker_punch].
#
@@ -18,7 +18,7 @@ module QueueAdapters
#
# Rails.application.config.active_job.queue_adapter = :sucker_punch
class SuckerPunchAdapter
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
if JobWrapper.respond_to?(:perform_async)
# sucker_punch 2.0 API
JobWrapper.perform_async job.serialize
@@ -28,16 +28,16 @@ def enqueue(job) #:nodoc:
end
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
if JobWrapper.respond_to?(:perform_in)
delay = timestamp - Time.current.to_f
JobWrapper.perform_in delay, job.serialize
else
- raise NotImplementedError, "sucker_punch 1.0 does not support `enqueued_at`. Please upgrade to version ~> 2.0.0 to enable this behavior."
+ raise NotImplementedError, "sucker_punch 1.0 does not support `enqueue_at`. Please upgrade to version ~> 2.0.0 to enable this behavior."
end
end
- class JobWrapper #:nodoc:
+ class JobWrapper # :nodoc:
include SuckerPunch::Job
def perform(job_data)
diff --git a/activejob/lib/active_job/queue_adapters/test_adapter.rb b/activejob/lib/active_job/queue_adapters/test_adapter.rb
index 9bd23a0aa7..6e1c8ec6a2 100644
--- a/activejob/lib/active_job/queue_adapters/test_adapter.rb
+++ b/activejob/lib/active_job/queue_adapters/test_adapter.rb
@@ -2,13 +2,13 @@
module ActiveJob
module QueueAdapters
- # == Test adapter for Active Job
+ # = Test adapter for Active Job
#
# The test adapter should be used only in testing. Along with
- # <tt>ActiveJob::TestCase</tt> and <tt>ActiveJob::TestHelper</tt>
- # it makes a great tool to test your Rails application.
+ # ActiveJob::TestCase and ActiveJob::TestHelper
+ # it makes a great tool to test your \Rails application.
#
- # To use the test adapter set queue_adapter config to +:test+.
+ # To use the test adapter set +queue_adapter+ config to +:test+.
#
# Rails.application.config.active_job.queue_adapter = :test
class TestAdapter
@@ -25,12 +25,12 @@ def performed_jobs
@performed_jobs ||= []
end
- def enqueue(job) #:nodoc:
+ def enqueue(job) # :nodoc:
job_data = job_to_hash(job)
perform_or_enqueue(perform_enqueued_jobs && !filtered?(job), job, job_data)
end
- def enqueue_at(job, timestamp) #:nodoc:
+ def enqueue_at(job, timestamp) # :nodoc:
job_data = job_to_hash(job, at: timestamp)
perform_or_enqueue(perform_enqueued_at_jobs && !filtered?(job), job, job_data)
end
@@ -41,6 +41,7 @@ def job_to_hash(job, extras = {})
job_data[:job] = job.class
job_data[:args] = job_data.fetch("arguments")
job_data[:queue] = job_data.fetch("queue_name")
+ job_data[:priority] = job_data.fetch("priority")
end.merge(extras)
end
@@ -58,7 +59,7 @@ def filtered?(job)
end
def filtered_time?(job)
- job.scheduled_at > at.to_f if at && job.scheduled_at
+ job.scheduled_at > at if at && job.scheduled_at
end
def filtered_queue?(job)
diff --git a/activejob/lib/active_job/queue_name.rb b/activejob/lib/active_job/queue_name.rb
index 4c028502f4..e6225ce0d7 100644
--- a/activejob/lib/active_job/queue_name.rb
+++ b/activejob/lib/active_job/queue_name.rb
@@ -19,8 +19,7 @@ module ClassMethods
# end
#
# Can be given a block that will evaluate in the context of the job
- # allowing +self.arguments+ to be accessed so that a dynamic queue name
- # can be applied:
+ # so that a dynamic queue name can be applied:
#
# class PublishToFeedJob < ApplicationJob
# queue_as do
@@ -45,7 +44,7 @@ def queue_as(part_name = nil, &block)
end
end
- def queue_name_from_part(part_name) #:nodoc:
+ def queue_name_from_part(part_name) # :nodoc:
queue_name = part_name || default_queue_name
name_parts = [queue_name_prefix.presence, queue_name]
-name_parts.compact.join(queue_name_delimiter)
diff --git a/activejob/lib/active_job/queue_priority.rb b/activejob/lib/active_job/queue_priority.rb
index 063bccdb01..2cab9798de 100644
--- a/activejob/lib/active_job/queue_priority.rb
+++ b/activejob/lib/active_job/queue_priority.rb
@@ -18,7 +18,24 @@ module ClassMethods
# end
# end
#
- # Specify either an argument or a block.
+ # Can be given a block that will evaluate in the context of the job
+ # so that a dynamic priority can be applied:
+ #
+ # class PublishToFeedJob < ApplicationJob
+ # queue_with_priority do
+ # post = self.arguments.first
+ #
+ # if post.paid?
+ # 10
+ # else
+ # 50
+ # end
+ # end
+ #
+ # def perform(post)
+ # post.to_feed!
+ # end
+ # end
def queue_with_priority(priority = nil, &block)
if block_given?
self.priority = block
diff --git a/activejob/lib/active_job/railtie.rb b/activejob/lib/active_job/railtie.rb
index 3064a591e7..f6073f0c17 100644
--- a/activejob/lib/active_job/railtie.rb
+++ b/activejob/lib/active_job/railtie.rb
@@ -8,6 +8,11 @@ module ActiveJob
class Railtie < Rails::Railtie # :nodoc:
config.active_job = ActiveSupport::OrderedOptions.new
config.active_job.custom_serializers = []
+ config.active_job.log_query_tags_around_perform = true
+
+ initializer "active_job.deprecator", before: :load_environment_config do |app|
+ app.deprecators[:active_job] = ActiveJob.deprecator
+ end
initializer "active_job.logger" do
ActiveSupport.on_load(:active_job) { self.logger = ::Rails.logger }
@@ -15,7 +20,7 @@ class Railtie < Rails::Railtie # :nodoc:
initializer "active_job.custom_serializers" do |app|
config.after_initialize do
- custom_serializers = app.config.active_job.delete(:custom_serializers)
+ custom_serializers = app.config.active_job.custom_serializers
ActiveJob::Serializers.add_serializers custom_serializers
end
end
@@ -24,20 +29,35 @@ class Railtie < Rails::Railtie # :nodoc:
options = app.config.active_job
options.queue_adapter ||= :async
+ config.after_initialize do
+ options.each do |k, v|
+ k = "#{k}="
+ if ActiveJob.respond_to?(k)
+ ActiveJob.send(k, v)
+ end
+ end
+ end
+
ActiveSupport.on_load(:active_job) do
- options.each do |k, v|
+ # Configs used in other initializers
+ options = options.except(
+ :log_query_tags_around_perform,
+ :custom_serializers
+ )
+
+ options.each do |k, v|
k = "#{k}="
- send(k, v) if respond_to? k
+ if ActiveJob.respond_to?(k)
+ ActiveJob.send(k, v)
+ elsif respond_to? k
+ send(k, v)
+ end
end
end
ActiveSupport.on_load(:action_dispatch_integration_test) do
include ActiveJob::TestHelper
end
-
- ActiveSupport.on_load(:active_record) do
- self.destroy_association_async_job = ActiveRecord::DestroyAssociationAsyncJob
- end
end
initializer "active_job.set_reloader_hook" do |app|
@@ -49,5 +69,25 @@ class Railtie < Rails::Railtie # :nodoc:
end
end
end
+
+ initializer "active_job.query_log_tags" do |app|
+ query_logs_tags_enabled = app.config.respond_to?(:active_record) &&
+ app.config.active_record.query_log_tags_enabled &&
+ app.config.active_job.log_query_tags_around_perform
+
+ if query_logs_tags_enabled
+ app.config.active_record.query_log_tags |= [:job]
+
+ ActiveSupport.on_load(:active_record) do
+ ActiveRecord::QueryLogs.taggings[:job] = ->(context) { context[:job].class.name if context[:job] }
+ end
+ end
+ end
+
+ initializer "active_job.backtrace_cleaner" do
+ ActiveSupport.on_load(:active_job) do
+ LogSubscriber.backtrace_cleaner = ::Rails.backtrace_cleaner
+ end
+ end
end
end
diff --git a/activejob/lib/active_job/serializers.rb b/activejob/lib/active_job/serializers.rb
index 9a931c36ab..3d156484ec 100644
--- a/activejob/lib/active_job/serializers.rb
+++ b/activejob/lib/active_job/serializers.rb
@@ -3,7 +3,9 @@
require "set"
module ActiveJob
- # The <tt>ActiveJob::Serializers</tt> module is used to store a list of known serializers
+ # = Active Job \Serializers
+ #
+ # The +ActiveJob::Serializers+ module is used to store a list of known serializers
# and to add new ones. It also has helpers to serialize/deserialize objects.
module Serializers # :nodoc:
extend ActiveSupport::Autoload
@@ -17,6 +19,8 @@ module Serializers # :nodoc:
autoload :TimeWithZoneSerializer
autoload :TimeSerializer
autoload :ModuleSerializer
+ autoload :RangeSerializer
+ autoload :BigDecimalSerializer
mattr_accessor :_additional_serializers
self._additional_serializers = Set.new
@@ -24,7 +28,7 @@ module Serializers # :nodoc:
class << self
# Returns serialized representative of the passed object.
# Will look up through all known serializers.
- # Raises <tt>ActiveJob::SerializationError</tt> if it can't find a proper serializer.
+ # Raises ActiveJob::SerializationError if it can't find a proper serializer.
def serialize(argument)
serializer = serializers.detect { |s| s.serialize?(argument) }
raise SerializationError.new("Unsupported argument type: #{argument.class.name}") unless serializer
@@ -61,6 +65,8 @@ def add_serializers(*new_serializers)
DateSerializer,
TimeWithZoneSerializer,
TimeSerializer,
- ModuleSerializer
+ ModuleSerializer,
+ RangeSerializer,
+ BigDecimalSerializer
end
end
diff --git a/activejob/lib/active_job/serializers/big_decimal_serializer.rb b/activejob/lib/active_job/serializers/big_decimal_serializer.rb
new file mode 100644
index 0000000000..1d03f872af
--- /dev/null
+++ b/activejob/lib/active_job/serializers/big_decimal_serializer.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+require "bigdecimal"
+
+module ActiveJob
+ module Serializers
+ class BigDecimalSerializer < ObjectSerializer # :nodoc:
+ def serialize(big_decimal)
+ super("value" => big_decimal.to_s)
+ end
+
+ def deserialize(hash)
+ BigDecimal(hash["value"])
+ end
+
+ private
+ def klass
+ BigDecimal
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/duration_serializer.rb b/activejob/lib/active_job/serializers/duration_serializer.rb
index b35eaf5f60..ab0696adaa 100644
--- a/activejob/lib/active_job/serializers/duration_serializer.rb
+++ b/activejob/lib/active_job/serializers/duration_serializer.rb
@@ -4,14 +4,16 @@ module ActiveJob
module Serializers
class DurationSerializer < ObjectSerializer # :nodoc:
def serialize(duration)
+ # Ideally duration.parts would be wrapped in an array before passing to Arguments.serialize,
+ # but we continue passing the bare hash for backwards compatibility:
super("value" => duration.value, "parts" => Arguments.serialize(duration.parts))
end
def deserialize(hash)
value = hash["value"]
parts = Arguments.deserialize(hash["parts"])
-
- klass.new(value, parts)
+ # `parts` is originally a hash, but will have been flattened to an array by Arguments.serialize
+ klass.new(value, parts.to_h)
end
private
diff --git a/activejob/lib/active_job/serializers/module_serializer.rb b/activejob/lib/active_job/serializers/module_serializer.rb
index bea747a68e..081a1d9c55 100644
--- a/activejob/lib/active_job/serializers/module_serializer.rb
+++ b/activejob/lib/active_job/serializers/module_serializer.rb
@@ -4,6 +4,7 @@ module ActiveJob
module Serializers
class ModuleSerializer < ObjectSerializer # :nodoc:
def serialize(constant)
+ raise SerializationError, "Serializing an anonymous class is not supported" unless constant.name
super("value" => constant.name)
end
diff --git a/activejob/lib/active_job/serializers/range_serializer.rb b/activejob/lib/active_job/serializers/range_serializer.rb
new file mode 100644
index 0000000000..3d21c251fe
--- /dev/null
+++ b/activejob/lib/active_job/serializers/range_serializer.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+module ActiveJob
+ module Serializers
+ class RangeSerializer < ObjectSerializer
+ KEYS = %w[begin end exclude_end].freeze
+
+ def serialize(range)
+ args = Arguments.serialize([range.begin, range.end, range.exclude_end?])
+ super(KEYS.zip(args).to_h)
+ end
+
+ def deserialize(hash)
+ klass.new(*Arguments.deserialize(hash.values_at(*KEYS)))
+ end
+
+ private
+ def klass
+ ::Range
+ end
+ end
+ end
+end
diff --git a/activejob/lib/active_job/serializers/time_with_zone_serializer.rb b/activejob/lib/active_job/serializers/time_with_zone_serializer.rb
index 9b89187840..3b68f0f3f5 100644
--- a/activejob/lib/active_job/serializers/time_with_zone_serializer.rb
+++ b/activejob/lib/active_job/serializers/time_with_zone_serializer.rb
@@ -2,9 +2,18 @@
module ActiveJob
module Serializers
- class TimeWithZoneSerializer < TimeObjectSerializer # :nodoc:
+ class TimeWithZoneSerializer < ObjectSerializer # :nodoc:
+ NANO_PRECISION = 9
+
+ def serialize(time_with_zone)
+ super(
+ "value" => time_with_zone.iso8601(NANO_PRECISION),
+ "time_zone" => time_with_zone.time_zone.tzinfo.name
+ )
+ end
+
def deserialize(hash)
- Time.iso8601(hash["value"]).in_time_zone
+ Time.iso8601(hash["value"]).in_time_zone(hash["time_zone"] || Time.zone)
end
private
diff --git a/activejob/lib/active_job/test_helper.rb b/activejob/lib/active_job/test_helper.rb
index 2d71ac96b6..e4f376944c 100644
--- a/activejob/lib/active_job/test_helper.rb
+++ b/activejob/lib/active_job/test_helper.rb
@@ -34,7 +34,9 @@ def enable_test_adapter(test_adapter)
end
end
- ActiveJob::Base.include(TestQueueAdapter)
+ ActiveSupport.on_load(:active_job) do
+ ActiveJob::Base.include(TestQueueAdapter)
+ end
def before_setup # :nodoc:
test_adapter = queue_adapter_for_test
@@ -54,15 +56,10 @@ def after_teardown # :nodoc:
queue_adapter_changed_jobs.each { |klass| klass.disable_test_adapter }
end
- # Specifies the queue adapter to use with all Active Job test helpers.
- #
- # Returns an instance of the queue adapter and defaults to
- # <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
- #
- # Note: The adapter provided by this method must provide some additional
- # methods from those expected of a standard <tt>ActiveJob::QueueAdapter</tt>
- # in order to be used with the active job test helpers. Refer to
- # <tt>ActiveJob::QueueAdapters::TestAdapter</tt>.
+ # Returns a queue adapter instance to use with all Active Job test helpers.
+ # By default, returns an instance of ActiveJob::QueueAdapters::TestAdapter.
+ # Override this method to specify a different adapter. The adapter must
+ # implement the same interface as ActiveJob::QueueAdapters::TestAdapter.
def queue_adapter_for_test
ActiveJob::QueueAdapters::TestAdapter.new
end
@@ -109,7 +106,7 @@ def queue_adapter_for_test
# end
# end
#
- # +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
+ # +:only+ and +:except+ options accept Class, Array of Class, or Proc. When passed a Proc,
# a hash containing the job's class and it's argument are passed as argument.
#
# Asserts the number of times a job is enqueued to a specific queue by passing +:queue+ option.
@@ -124,7 +121,7 @@ def assert_enqueued_jobs(number, only: nil, except: nil, queue: nil, &block)
if block_given?
original_jobs = enqueued_jobs_with(only: only, except: except, queue: queue)
- assert_nothing_raised(&block)
+ _assert_nothing_raised_or_warn("assert_enqueued_jobs", &block)
new_jobs = enqueued_jobs_with(only: only, except: except, queue: queue)
@@ -168,7 +165,7 @@ def assert_enqueued_jobs(number, only: nil, except: nil, queue: nil, &block)
# end
# end
#
- # +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
+ # +:only+ and +:except+ options accept Class, Array of Class, or Proc. When passed a Proc,
# a hash containing the job's class and it's argument are passed as argument.
#
# Asserts that no jobs are enqueued to a specific queue by passing +:queue+ option
@@ -325,7 +322,7 @@ def assert_performed_jobs(number, only: nil, except: nil, queue: nil, &block)
# end
# end
#
- # +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
+ # +:only+ and +:except+ options accept Class, Array of Class, or Proc. When passed a Proc,
# an instance of the job will be passed as argument.
#
# If the +:queue+ option is specified,
@@ -354,6 +351,13 @@ def assert_no_performed_jobs(only: nil, except: nil, queue: nil, &block)
# assert_enqueued_with(at: Date.tomorrow.noon, queue: "my_queue")
# end
#
+ # For keyword arguments, specify them as a hash inside an array:
+ #
+ # def test_assert_enqueued_with_keyword_arguments
+ # MyJob.perform_later(arg1: 'value1', arg2: 'value2')
+ # assert_enqueued_with(job: MyJob, args: [{ arg1: 'value1', arg2: 'value2' }])
+ # end
+ #
# The given arguments may also be specified as matcher procs that return a
# boolean value indicating whether a job's attribute meets certain criteria.
#
@@ -389,15 +393,15 @@ def assert_no_performed_jobs(only: nil, except: nil, queue: nil, &block)
# MyJob.set(wait_until: Date.tomorrow.noon).perform_later
# end
# end
- def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil, &block)
- expected = { job: job, args: args, at: at, queue: queue }.compact
+ def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil, priority: nil, &block)
+ expected = { job: job, args: args, at: at, queue: queue, priority: priority }.compact
expected_args = prepare_args_for_assertion(expected)
potential_matches = []
if block_given?
original_enqueued_jobs = enqueued_jobs.dup
- assert_nothing_raised(&block)
+ _assert_nothing_raised_or_warn("assert_enqueued_with", &block)
jobs = enqueued_jobs - original_enqueued_jobs
else
@@ -417,8 +421,20 @@ def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil, &block)
end
end
+ matching_class = potential_matches.select do |enqueued_job|
+ enqueued_job["job_class"] == job.to_s
+ end
+
message = +"No enqueued job found with #{expected}"
- message << "\n\nPotential matches: #{potential_matches.join("\n")}" if potential_matches.present?
+ if potential_matches.empty?
+ message << "\n\nNo jobs were enqueued"
+ elsif matching_class.empty?
+ message << "\n\nNo jobs of class #{expected[:job]} were enqueued, job classes enqueued: "
+ message << potential_matches.map { |job| job["job_class"] }.join(", ")
+ else
+ message << "\n\nPotential matches: #{matching_class.join("\n")}"
+ end
+
assert matching_job, message
instantiate_job(matching_job)
end
@@ -479,8 +495,8 @@ def assert_enqueued_with(job: nil, args: nil, at: nil, queue: nil, &block)
# MyJob.set(wait_until: Date.tomorrow.noon).perform_later
# end
# end
- def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
- expected = { job: job, args: args, at: at, queue: queue }.compact
+ def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, priority: nil, &block)
+ expected = { job: job, args: args, at: at, queue: queue, priority: priority }.compact
expected_args = prepare_args_for_assertion(expected)
potential_matches = []
@@ -507,8 +523,20 @@ def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
end
end
+ matching_class = potential_matches.select do |enqueued_job|
+ enqueued_job["job_class"] == job.to_s
+ end
+
message = +"No performed job found with #{expected}"
- message << "\n\nPotential matches: #{potential_matches.join("\n")}" if potential_matches.present?
+ if potential_matches.empty?
+ message << "\n\nNo jobs were performed"
+ elsif matching_class.empty?
+ message << "\n\nNo jobs of class #{expected[:job]} were performed, job classes performed: "
+ message << potential_matches.map { |job| job["job_class"] }.join(", ")
+ else
+ message << "\n\nPotential matches: #{matching_class.join("\n")}"
+ end
+
assert matching_job, message
instantiate_job(matching_job)
@@ -555,7 +583,7 @@ def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
# assert_performed_jobs 1
# end
#
- # +:only+ and +:except+ options accepts Class, Array of Class or Proc. When passed a Proc,
+ # +:only+ and +:except+ options accept Class, Array of Class, or Proc. When passed a Proc,
# an instance of the job will be passed as argument.
#
# If the +:queue+ option is specified,
@@ -569,11 +597,17 @@ def assert_performed_with(job: nil, args: nil, at: nil, queue: nil, &block)
# assert_performed_jobs 1
# end
#
- # If the +:at+ option is specified, then only run jobs enqueued to run
- # immediately or before the given time
+ # If the +:at+ option is specified, then only jobs that have been enqueued
+ # to run at or before the given time will be performed. This includes jobs
+ # that have been enqueued without a time.
+ #
+ # If queue_adapter_for_test is overridden to return a different adapter,
+ # +perform_enqueued_jobs+ will merely execute the block.
def perform_enqueued_jobs(only: nil, except: nil, queue: nil, at: nil, &block)
return flush_enqueued_jobs(only: only, except: except, queue: queue, at: at) unless block_given?
+ return _assert_nothing_raised_or_warn("perform_enqueued_jobs", &block) unless using_test_adapter?
+
validate_option(only: only, except: except)
old_perform_enqueued_jobs = queue_adapter.perform_enqueued_jobs
@@ -591,7 +625,7 @@ def perform_enqueued_jobs(only: nil, except: nil, queue: nil, at: nil, &block)
queue_adapter.queue = queue
queue_adapter.at = at
- assert_nothing_raised(&block)
+ _assert_nothing_raised_or_warn("perform_enqueued_jobs", &block)
ensure
queue_adapter.perform_enqueued_jobs = old_perform_enqueued_jobs
queue_adapter.perform_enqueued_at_jobs = old_perform_enqueued_at_jobs
@@ -612,12 +646,16 @@ def queue_adapter
end
private
+ def using_test_adapter?
+ queue_adapter.is_a?(ActiveJob::QueueAdapters::TestAdapter)
+ end
+
def clear_enqueued_jobs
- enqueued_jobs.clear
+ enqueued_jobs.clear if using_test_adapter?
end
def clear_performed_jobs
- performed_jobs.clear
+ performed_jobs.clear if using_test_adapter?
end
def jobs_with(jobs, only: nil, except: nil, queue: nil, at: nil)
@@ -664,12 +702,16 @@ def flush_enqueued_jobs(only: nil, except: nil, queue: nil, at: nil)
enqueued_jobs_with(only: only, except: except, queue: queue, at: at) do |payload|
queue_adapter.enqueued_jobs.delete(payload)
queue_adapter.performed_jobs << payload
- instantiate_job(payload).perform_now
+ instantiate_job(payload, skip_deserialize_arguments: true).perform_now
end.count
end
def prepare_args_for_assertion(args)
args.dup.tap do |arguments|
+ if arguments[:queue].is_a?(Symbol)
+ arguments[:queue] = arguments[:queue].to_s
+ end
+
if arguments[:at].acts_like?(:time)
at_range = arguments[:at] - 1..arguments[:at] + 1
arguments[:at] = ->(at) { at_range.cover?(at) }
@@ -684,10 +726,10 @@ def deserialize_args_for_assertion(job)
end
end
- def instantiate_job(payload)
+ def instantiate_job(payload, skip_deserialize_arguments: false)
job = payload[:job].deserialize(payload)
job.scheduled_at = Time.at(payload[:at]) if payload.key?(:at)
- job.send(:deserialize_arguments_if_needed)
+ job.send(:deserialize_arguments_if_needed) unless skip_deserialize_arguments
job
end
diff --git a/activejob/lib/active_job/timezones.rb b/activejob/lib/active_job/timezones.rb
index ac018eb752..3a625f658f 100644
--- a/activejob/lib/active_job/timezones.rb
+++ b/activejob/lib/active_job/timezones.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
module ActiveJob
- module Timezones #:nodoc:
+ module Timezones # :nodoc:
extend ActiveSupport::Concern
included do
diff --git a/activejob/lib/active_job/translation.rb b/activejob/lib/active_job/translation.rb
index 0fd9b9fc06..a90950b3d0 100644
--- a/activejob/lib/active_job/translation.rb
+++ b/activejob/lib/active_job/translation.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
module ActiveJob
- module Translation #:nodoc:
+ module Translation # :nodoc:
extend ActiveSupport::Concern
included do
diff --git a/activejob/lib/active_job/version.rb b/activejob/lib/active_job/version.rb
index eae7da4d05..baa844bbeb 100644
--- a/activejob/lib/active_job/version.rb
+++ b/activejob/lib/active_job/version.rb
@@ -3,7 +3,7 @@
require_relative "gem_version"
module ActiveJob
- # Returns the version of the currently loaded Active Job as a <tt>Gem::Version</tt>
+ # Returns the currently loaded version of Active Job as a +Gem::Version+.
def self.version
gem_version
end
diff --git a/activejob/lib/rails/generators/job/USAGE b/activejob/lib/rails/generators/job/USAGE
new file mode 100644
index 0000000000..41f8e4468c
--- /dev/null
+++ b/activejob/lib/rails/generators/job/USAGE
@@ -0,0 +1,19 @@
+Description:
+ Generates a new job. Pass the job name, either CamelCased or
+ under_scored, with or without the job postfix.
+
+Examples:
+ `bin/rails generate job checkout`
+
+ Creates the following files:
+
+ Job: app/jobs/checkout_job.rb
+ Test: test/jobs/checkout_job_test.rb
+
+ `bin/rails generate job send_sms --queue=sms`
+
+ Creates a job and test with a custom sms queue.
+
+ `bin/rails generate job process_payment --parent=payment_job`
+
+ Creates a job and test with a `PaymentJob` parent class.
diff --git a/activejob/lib/rails/generators/job/job_generator.rb b/activejob/lib/rails/generators/job/job_generator.rb
index 03346a7f12..b08c097ea8 100644
--- a/activejob/lib/rails/generators/job/job_generator.rb
+++ b/activejob/lib/rails/generators/job/job_generator.rb
@@ -5,10 +5,10 @@
module Rails # :nodoc:
module Generators # :nodoc:
class JobGenerator < Rails::Generators::NamedBase # :nodoc:
- desc "This generator creates an active job file at app/jobs"
-
class_option :queue, type: :string, default: "default", desc: "The queue name for the generated job"
+ class_option :parent, type: :string, default: "ApplicationJob", desc: "The parent class for the generated job"
+
check_class_collision suffix: "Job"
hook_for :test_framework
@@ -28,6 +28,10 @@ def create_job_file
end
private
+ def parent_class_name
+ options[:parent]
+ end
+
def file_name
@_file_name ||= super.sub(/_job\z/i, "")
end
diff --git a/activejob/lib/rails/generators/job/templates/job.rb.tt b/activejob/lib/rails/generators/job/templates/job.rb.tt
index 4ad2914a45..196f229d10 100644
--- a/activejob/lib/rails/generators/job/templates/job.rb.tt
+++ b/activejob/lib/rails/generators/job/templates/job.rb.tt
@@ -1,5 +1,5 @@
<% module_namespacing do -%>
-class <%= class_name %>Job < ApplicationJob
+class <%= class_name %>Job < <%= parent_class_name.classify %>
queue_as :<%= options[:queue] %>
def perform(*args)
diff --git a/activejob/test/adapters/delayed_job.rb b/activejob/test/adapters/delayed_job.rb
index 904b4c3f90..e6c18a8bdb 100644
--- a/activejob/test/adapters/delayed_job.rb
+++ b/activejob/test/adapters/delayed_job.rb
@@ -3,6 +3,7 @@
ActiveJob::Base.queue_adapter = :delayed_job
$LOAD_PATH << File.expand_path("../support/delayed_job", __dir__)
+require "active_support/core_ext/kernel/reporting"
Delayed::Worker.delay_jobs = false
Delayed::Worker.backend = :test
diff --git a/activejob/test/cases/argument_serialization_test.rb b/activejob/test/cases/argument_serialization_test.rb
index 8f47ec6ae2..fec7560431 100644
--- a/activejob/test/cases/argument_serialization_test.rb
+++ b/activejob/test/cases/argument_serialization_test.rb
@@ -1,10 +1,15 @@
# frozen_string_literal: true
+require "json"
+require "bigdecimal"
require "helper"
require "active_job/arguments"
require "models/person"
require "active_support/core_ext/hash/indifferent_access"
+require "active_support/core_ext/integer/time"
+require "active_support/duration"
require "jobs/kwargs_job"
+require "jobs/arguments_round_trip_job"
require "support/stubs/strong_parameters"
class ArgumentSerializationTest < ActiveSupport::TestCase
@@ -14,12 +19,38 @@ class ClassArgument; end
class ClassArgument; end
+ class MyClassWithPermitted
+ def self.permitted?
+ end
+ end
+
+ class MyString < String
+ end
+
+ class MyStringSerializer < ActiveJob::Serializers::ObjectSerializer
+ def serialize(argument)
+ super({ "value" => argument.to_s })
+ end
+
+ def deserialize(hash)
+ MyString.new(hash["value"])
+ end
+
+ private
+ def klass
+ MyString
+ end
+ end
+
+ class StringWithoutSerializer < String
+ end
+
setup do
@person = Person.find("5")
end
[ nil, 1, 1.0, 1_000_000_000_000_000_000_000,
- "a", true, false, BigDecimal(5),
+ "a", true, false,
:a,
1.day,
Date.new(2001, 2, 3),
@@ -30,14 +61,49 @@ class ClassArgument; end
{ "a" => 1 },
ModuleArgument,
ModuleArgument::ClassArgument,
- ClassArgument
+ ClassArgument,
+ 1..,
+ 1...,
+ 1..5,
+ 1...5,
+ "a".."z",
+ "A".."Z",
+ Date.new(2001, 2, 3)..,
+ 10.days.ago..Date.today,
+ Time.new(2002, 10, 31, 2, 2, 2.123456789r, "+02:00")..,
+ 10.hours.ago..Time.current,
+ DateTime.new(2001, 2, 3, 4, 5, 6.123456r, "+03:00")..,
+ (DateTime.current - 4.weeks)..DateTime.current,
+ ActiveSupport::TimeWithZone.new(Time.utc(1999, 12, 31, 23, 59, "59.123456789".to_r), ActiveSupport::TimeZone["UTC"])..,
].each do |arg|
test "serializes #{arg.class} - #{arg.inspect} verbatim" do
assert_arguments_unchanged arg
end
end
- [ Object.new, Person.find("5").to_gid ].each do |arg|
+ test "dangerously treats BigDecimal arguments as primitives not requiring serialization by default" do
+ assert_deprecated(<<~MSG.chomp, ActiveJob.deprecator) do
+ Primitive serialization of BigDecimal job arguments is deprecated as it may serialize via .to_s using certain queue adapters.
+ Enable config.active_job.use_big_decimal_serializer to use BigDecimalSerializer instead, which will be mandatory in Rails 7.2.
+
+ Note that if your application has multiple replicas, you should only enable this setting after successfully deploying your app to Rails 7.1 first.
+ This will ensure that during your deployment all replicas are capable of deserializing arguments serialized with BigDecimalSerializer.
+ MSG
+ assert_equal(
+ BigDecimal(5),
+ *ActiveJob::Arguments.deserialize(ActiveJob::Arguments.serialize([BigDecimal(5)])),
+ )
+ end
+ end
+
+ test "safely serializes BigDecimal arguments if configured to use_big_decimal_serializer" do
+ # BigDecimal(5) example should be moved back up into array above in Rails 7.2
+ with_big_decimal_serializer do
+ assert_arguments_unchanged BigDecimal(5)
+ end
+ end
+
+ [ Object.new, Person.find("5").to_gid, Class.new ].each do |arg|
test "does not serialize #{arg.class}" do
assert_raises ActiveJob::SerializationError do
ActiveJob::Arguments.serialize [ arg ]
@@ -75,6 +141,32 @@ class ClassArgument; end
)
end
+ # Regression test to #48561
+ test "serialize a class with permitted? defined" do
+ assert_arguments_unchanged MyClassWithPermitted
+ end
+
+ test "serialize a String subclass object" do
+ original_serializers = ActiveJob::Serializers.serializers
+ ActiveJob::Serializers.add_serializers(MyStringSerializer)
+
+ my_string = MyString.new("foo")
+ serialized = ActiveJob::Arguments.serialize([my_string])
+ deserialized = ActiveJob::Arguments.deserialize(JSON.load(JSON.dump(serialized))).first
+ assert_instance_of MyString, deserialized
+ assert_equal my_string, deserialized
+ ensure
+ ActiveJob::Serializers._additional_serializers = original_serializers
+ end
+
+ test "serialize a String subclass object without a serializer" do
+ string_without_serializer = StringWithoutSerializer.new("foo")
+ serialized = ActiveJob::Arguments.serialize([string_without_serializer])
+ deserialized = ActiveJob::Arguments.deserialize(JSON.load(JSON.dump(serialized))).first
+ assert_instance_of String, deserialized
+ assert_equal string_without_serializer, deserialized
+ end
+
test "serialize a hash" do
symbol_key = { a: 1 }
string_key = { "a" => 1 }
@@ -141,6 +233,12 @@ class ClassArgument; end
end
end
+ test "should maintain a functional duration" do
+ duration = perform_round_trip([1.year]).first
+ assert_kind_of Hash, duration.parts
+ assert_equal 2.years, duration + 1.year
+ end
+
test "should disallow non-string/symbol hash keys" do
assert_raises ActiveJob::SerializationError do
ActiveJob::Arguments.serialize [ { 1 => 2 } ]
@@ -195,6 +293,16 @@ def assert_arguments_roundtrip(args)
end
def perform_round_trip(args)
- ActiveJob::Arguments.deserialize(ActiveJob::Arguments.serialize(args))
+ ArgumentsRoundTripJob.perform_later(*args) # Actually performed inline
+
+ JobBuffer.last_value
+ end
+
+ def with_big_decimal_serializer(temporary = true)
+ original = ActiveJob.use_big_decimal_serializer
+ ActiveJob.use_big_decimal_serializer = temporary
+ yield
+ ensure
+ ActiveJob.use_big_decimal_serializer = original
end
end
diff --git a/activejob/test/cases/async_adapter_test.rb b/activejob/test/cases/async_adapter_test.rb
new file mode 100644
index 0000000000..de4df92816
--- /dev/null
+++ b/activejob/test/cases/async_adapter_test.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+require "helper"
+require "active_job/queue_adapters/async_adapter"
+require "jobs/hello_job"
+
+class AsyncAdapterTest < ActiveSupport::TestCase
+ setup do
+ JobBuffer.clear
+ ActiveJob::Base.queue_adapter.immediate = true
+ end
+
+ test "in immediate run, perform_later runs immediately" do
+ HelloJob.perform_later "Alex"
+ assert_match(/Alex/, JobBuffer.last_value)
+ end
+
+ test "in immediate run, enqueue with wait: runs immediately" do
+ HelloJob.set(wait_until: Date.tomorrow.noon).perform_later "Alex"
+ assert_match(/Alex/, JobBuffer.last_value)
+ end
+end
diff --git a/activejob/test/cases/callbacks_test.rb b/activejob/test/cases/callbacks_test.rb
index 655638db8d..af24fce458 100644
--- a/activejob/test/cases/callbacks_test.rb
+++ b/activejob/test/cases/callbacks_test.rb
@@ -54,145 +54,22 @@ def perform
assert_equal false, AbortBeforeEnqueueJob.new.enqueue
end
- test "#enqueue does not run after_enqueue callbacks when skip_after_callbacks_if_terminated is true" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = true
- reload_job
+ test "#enqueue does not run after_enqueue callbacks when previous callbacks aborted" do
job = AbortBeforeEnqueueJob.new
- ActiveSupport::Deprecation.silence do
- job.enqueue
- end
+ job.enqueue
assert_nil(job.flag)
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
- end
-
- test "#enqueue does run after_enqueue callbacks when skip_after_callbacks_if_terminated is false" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = false
- reload_job
- job = AbortBeforeEnqueueJob.new
- assert_deprecated(/`after_enqueue`\/`after_perform` callbacks no longer run/) do
- job.enqueue
- end
-
- assert_equal("after_enqueue", job.flag)
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
- end
-
- test "#enqueue does not throw a deprecation warning when skip_after_callbacks_if_terminated_is false but job has no after callbacks" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = false
-
- job = Class.new(ActiveJob::Base) do
- before_enqueue { throw(:abort) }
- end.new
-
- assert_not_deprecated do
- job.enqueue
- end
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
end
- test "#enqueue does not throw a deprecation warning when skip_after_callbacks_if_terminated_is false and job did not throw an abort" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = false
-
- job = Class.new(ActiveJob::Base) do
- after_enqueue { nil }
-
- around_enqueue do |_, block|
- block.call
- rescue ArgumentError
- nil
- end
-
- before_enqueue { raise ArgumentError }
- end
-
- assert_not_deprecated do
- job.perform_later
- end
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
- end
-
- test "#perform does not run after_perform callbacks when skip_after_callbacks_if_terminated is true" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = true
- reload_job
+ test "#perform does not run after_perform callbacks when swhen previous callbacks aborted" do
job = AbortBeforeEnqueueJob.new
job.perform_now
assert_nil(job.flag)
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
- end
-
- test "#perform does run after_perform callbacks when skip_after_callbacks_if_terminated is false" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = false
- reload_job
- job = AbortBeforeEnqueueJob.new
- assert_deprecated(/`after_enqueue`\/`after_perform` callbacks no longer run/) do
- job.perform_now
- end
-
- assert_equal("after_perform", job.flag)
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
- end
-
- test "#perform does not throw a deprecation warning when skip_after_callbacks_if_terminated_is false but job has no after callbacks" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = false
-
- job = Class.new(ActiveJob::Base) do
- before_perform { throw(:abort) }
- end
-
- assert_not_deprecated do
- job.perform_now
- end
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
- end
-
- test "#perform does not throw a deprecation warning when skip_after_callbacks_if_terminated_is false and job did not throw an abort" do
- prev = ActiveJob::Base.skip_after_callbacks_if_terminated
- ActiveJob::Base.skip_after_callbacks_if_terminated = false
-
- job = Class.new(ActiveJob::Base) do
- after_perform { nil }
-
- around_perform do |_, block|
- block.call
- rescue ArgumentError
- nil
- end
-
- before_perform { raise ArgumentError }
- end
-
- assert_not_deprecated do
- job.perform_now
- end
- ensure
- ActiveJob::Base.skip_after_callbacks_if_terminated = prev
end
test "#enqueue returns self when the job was enqueued" do
job = CallbackJob.new
assert_equal job, job.enqueue
end
-
- private
- def reload_job
- Object.send(:remove_const, :AbortBeforeEnqueueJob)
- $LOADED_FEATURES.delete($LOADED_FEATURES.grep(%r{jobs/abort_before_enqueue_job}).first)
- require "jobs/abort_before_enqueue_job"
- end
end
diff --git a/activejob/test/cases/delayed_job_adapter_test.rb b/activejob/test/cases/delayed_job_adapter_test.rb
new file mode 100644
index 0000000000..2c7477fadf
--- /dev/null
+++ b/activejob/test/cases/delayed_job_adapter_test.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+require "active_job/queue_adapters/delayed_job_adapter"
+
+class DelayedJobAdapterTest < ActiveSupport::TestCase
+ test "does not log arguments when log_arguments is set to false on a job" do
+ job_id = SecureRandom.uuid
+
+ job_wrapper = ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper.new(
+ "job_class" => DisableLogJob.to_s,
+ "queue_name" => "default",
+ "job_id" => job_id,
+ "arguments" => { "some" => { "job" => "arguments" } }
+ )
+
+ assert_equal "DisableLogJob [#{job_id}] from DelayedJob(default)", job_wrapper.display_name
+ end
+
+ test "logs arguments when log_arguments is set to true on a job" do
+ job_id = SecureRandom.uuid
+ arguments = { "some" => { "job" => "arguments" } }
+
+ job_wrapper = ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper.new(
+ "job_class" => HelloJob.to_s,
+ "queue_name" => "default",
+ "job_id" => job_id,
+ "arguments" => arguments
+ )
+
+ assert_equal "HelloJob [#{job_id}] from DelayedJob(default) with arguments: #{arguments}",
+ job_wrapper.display_name
+ end
+
+ test "shows name for invalid job class" do
+ job_id = SecureRandom.uuid
+
+ job_wrapper = ActiveJob::QueueAdapters::DelayedJobAdapter::JobWrapper.new(
+ "job_class" => "NotExistingJob",
+ "queue_name" => "default",
+ "job_id" => job_id,
+ "arguments" => { "some" => { "job" => "arguments" } }
+ )
+
+ assert_equal "NotExistingJob [#{job_id}] from DelayedJob(default)", job_wrapper.display_name
+ end
+end
diff --git a/activejob/test/cases/exceptions_test.rb b/activejob/test/cases/exceptions_test.rb
index 5555b51b0a..08011e8b05 100644
--- a/activejob/test/cases/exceptions_test.rb
+++ b/activejob/test/cases/exceptions_test.rb
@@ -2,6 +2,7 @@
require "helper"
require "jobs/retry_job"
+require "jobs/after_discard_retry_job"
require "models/person"
require "minitest/mock"
@@ -33,15 +34,14 @@ class ExceptionsTest < ActiveSupport::TestCase
assert_raises SecondRetryableErrorOfTwo do
RetryJob.perform_later(exceptions_to_raise, 5)
-
- assert_equal [
- "Raised FirstRetryableErrorOfTwo for the 1st time",
- "Raised FirstRetryableErrorOfTwo for the 2nd time",
- "Raised FirstRetryableErrorOfTwo for the 3rd time",
- "Raised SecondRetryableErrorOfTwo for the 4th time",
- "Raised SecondRetryableErrorOfTwo for the 5th time",
- ], JobBuffer.values
end
+
+ assert_equal [
+ "Raised FirstRetryableErrorOfTwo for the 1st time",
+ "Raised FirstRetryableErrorOfTwo for the 2nd time",
+ "Raised FirstRetryableErrorOfTwo for the 3rd time",
+ "Raised SecondRetryableErrorOfTwo for the 4th time"
+ ], JobBuffer.values
end
test "keeps a separate attempts counter for each individual retry_on declaration" do
@@ -108,23 +108,23 @@ class ExceptionsTest < ActiveSupport::TestCase
end
end
- test "exponentially retrying job includes jitter" do
+ test "polynomially retrying job includes jitter" do
travel_to Time.now
random_amount = 2
delay_for_jitter = -> (delay) { random_amount * delay * ActiveJob::Base.retry_jitter }
Kernel.stub(:rand, random_amount) do
- RetryJob.perform_later "ExponentialWaitTenAttemptsError", 5, :log_scheduled_at
+ RetryJob.perform_later "PolynomialWaitTenAttemptsError", 5, :log_scheduled_at
assert_equal [
- "Raised ExponentialWaitTenAttemptsError for the 1st time",
+ "Raised PolynomialWaitTenAttemptsError for the 1st time",
"Next execution scheduled at #{(Time.now + 3.seconds + delay_for_jitter.(1)).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 2nd time",
+ "Raised PolynomialWaitTenAttemptsError for the 2nd time",
"Next execution scheduled at #{(Time.now + 18.seconds + delay_for_jitter.(16)).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 3rd time",
+ "Raised PolynomialWaitTenAttemptsError for the 3rd time",
"Next execution scheduled at #{(Time.now + 83.seconds + delay_for_jitter.(81)).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 4th time",
+ "Raised PolynomialWaitTenAttemptsError for the 4th time",
"Next execution scheduled at #{(Time.now + 258.seconds + delay_for_jitter.(256)).to_f}",
"Successfully completed job"
], JobBuffer.values
@@ -140,16 +140,16 @@ class ExceptionsTest < ActiveSupport::TestCase
random_amount = 1
Kernel.stub(:rand, random_amount) do
- RetryJob.perform_later "ExponentialWaitTenAttemptsError", 5, :log_scheduled_at
+ RetryJob.perform_later "PolynomialWaitTenAttemptsError", 5, :log_scheduled_at
assert_equal [
- "Raised ExponentialWaitTenAttemptsError for the 1st time",
+ "Raised PolynomialWaitTenAttemptsError for the 1st time",
"Next execution scheduled at #{(Time.now + 7.seconds).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 2nd time",
+ "Raised PolynomialWaitTenAttemptsError for the 2nd time",
"Next execution scheduled at #{(Time.now + 82.seconds).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 3rd time",
+ "Raised PolynomialWaitTenAttemptsError for the 3rd time",
"Next execution scheduled at #{(Time.now + 407.seconds).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 4th time",
+ "Raised PolynomialWaitTenAttemptsError for the 4th time",
"Next execution scheduled at #{(Time.now + 1282.seconds).to_f}",
"Successfully completed job"
], JobBuffer.values
@@ -175,16 +175,16 @@ class ExceptionsTest < ActiveSupport::TestCase
ActiveJob::Base.retry_jitter = old_jitter
end
- test "random wait time for exponentially retrying job when retry jitter delay multiplier value is between 1 and 2" do
+ test "random wait time for polynomially retrying job when retry jitter delay multiplier value is between 1 and 2" do
old_jitter = ActiveJob::Base.retry_jitter
ActiveJob::Base.retry_jitter = 1.2
travel_to Time.now
- RetryJob.perform_later "ExponentialWaitTenAttemptsError", 2, :log_scheduled_at
+ RetryJob.perform_later "PolynomialWaitTenAttemptsError", 2, :log_scheduled_at
assert_not_equal [
- "Raised ExponentialWaitTenAttemptsError for the 1st time",
+ "Raised PolynomialWaitTenAttemptsError for the 1st time",
"Next execution scheduled at #{(Time.now + 3.seconds).to_f}",
"Successfully completed job"
], JobBuffer.values
@@ -198,10 +198,10 @@ class ExceptionsTest < ActiveSupport::TestCase
travel_to Time.now
- RetryJob.perform_later "ExponentialWaitTenAttemptsError", 2, :log_scheduled_at
+ RetryJob.perform_later "PolynomialWaitTenAttemptsError", 2, :log_scheduled_at
assert_not_equal [
- "Raised ExponentialWaitTenAttemptsError for the 1st time",
+ "Raised PolynomialWaitTenAttemptsError for the 1st time",
"Next execution scheduled at #{(Time.now + 3.seconds).to_f}",
"Successfully completed job"
], JobBuffer.values
@@ -258,7 +258,7 @@ class ExceptionsTest < ActiveSupport::TestCase
test "use individual execution timers when calculating retry delay" do
travel_to Time.now
- exceptions_to_raise = %w(ExponentialWaitTenAttemptsError CustomWaitTenAttemptsError ExponentialWaitTenAttemptsError CustomWaitTenAttemptsError)
+ exceptions_to_raise = %w(PolynomialWaitTenAttemptsError CustomWaitTenAttemptsError PolynomialWaitTenAttemptsError CustomWaitTenAttemptsError)
random_amount = 1
@@ -268,11 +268,11 @@ class ExceptionsTest < ActiveSupport::TestCase
delay_for_jitter = -> (delay) { random_amount * delay * ActiveJob::Base.retry_jitter }
assert_equal [
- "Raised ExponentialWaitTenAttemptsError for the 1st time",
+ "Raised PolynomialWaitTenAttemptsError for the 1st time",
"Next execution scheduled at #{(Time.now + 3.seconds + delay_for_jitter.(1)).to_f}",
"Raised CustomWaitTenAttemptsError for the 2nd time",
"Next execution scheduled at #{(Time.now + 2.seconds).to_f}",
- "Raised ExponentialWaitTenAttemptsError for the 3rd time",
+ "Raised PolynomialWaitTenAttemptsError for the 3rd time",
"Next execution scheduled at #{(Time.now + 18.seconds + delay_for_jitter.(16)).to_f}",
"Raised CustomWaitTenAttemptsError for the 4th time",
"Next execution scheduled at #{(Time.now + 4.seconds).to_f}",
@@ -300,6 +300,14 @@ class ExceptionsTest < ActiveSupport::TestCase
assert_equal ["Raised ActiveJob::DeserializationError for the 5 time"], JobBuffer.values
end
+ test "successfully retry job throwing UnlimitedRetryError a few times" do
+ RetryJob.perform_later "UnlimitedRetryError", 10
+
+ assert_equal 10, JobBuffer.values.size
+ assert_equal "Raised UnlimitedRetryError for the 9th time", JobBuffer.values[8]
+ assert_equal "Successfully completed job", JobBuffer.values[9]
+ end
+
test "running a job enqueued by AJ 5.2" do
job = RetryJob.new("DefaultsError", 6)
job.exception_executions = nil # This is how jobs from Rails 5.2 will look
@@ -325,6 +333,67 @@ class ExceptionsTest < ActiveSupport::TestCase
assert_equal ["Raised DefaultsError for the 5th time"], JobBuffer.values
end
+ test "#after_discard block is run when an unhandled error is raised" do
+ assert_raises(AfterDiscardRetryJob::UnhandledError) do
+ AfterDiscardRetryJob.perform_later("AfterDiscardRetryJob::UnhandledError", 2)
+ end
+
+ assert_equal "Ran after_discard for job. Message: AfterDiscardRetryJob::UnhandledError", JobBuffer.last_value
+ end
+
+ test "#after_discard block is run when #retry_on is passed a block" do
+ AfterDiscardRetryJob.perform_later("AfterDiscardRetryJob::CustomCatchError", 6)
+
+ assert_equal "Ran after_discard for job. Message: AfterDiscardRetryJob::CustomCatchError", JobBuffer.last_value
+ end
+
+ test "#after_discard block is only run once when an error class and its superclass are handled by separate #retry_on calls" do
+ assert_raises(AfterDiscardRetryJob::ChildAfterDiscardError) do
+ AfterDiscardRetryJob.perform_later("AfterDiscardRetryJob::ChildAfterDiscardError", 6)
+ end
+ assert_equal ["Raised AfterDiscardRetryJob::ChildAfterDiscardError for the 5th time", "Ran after_discard for job. Message: AfterDiscardRetryJob::ChildAfterDiscardError"], JobBuffer.values.last(2)
+ end
+
+ test "#after_discard is run when a job is discarded via #discard_on" do
+ AfterDiscardRetryJob.perform_later("AfterDiscardRetryJob::DiscardableError", 2)
+
+ assert_equal "Ran after_discard for job. Message: AfterDiscardRetryJob::DiscardableError", JobBuffer.last_value
+ end
+
+ test "#after_discard is run when a job is discarded via #discard_on with a block passed to #discard_on" do
+ AfterDiscardRetryJob.perform_later("AfterDiscardRetryJob::CustomDiscardableError", 2)
+
+ expected_array = [
+ "Dealt with a job that was discarded in a custom way. Message: AfterDiscardRetryJob::CustomDiscardableError",
+ "Ran after_discard for job. Message: AfterDiscardRetryJob::CustomDiscardableError"
+ ]
+ assert_equal expected_array, JobBuffer.values.last(2)
+ end
+
+ class ::LegacyExponentialNamingError < StandardError; end
+ test "wait: :exponentially_longer is deprecated but still works" do
+ assert_deprecated(ActiveJob.deprecator) do
+ class LegacyRetryJob < RetryJob
+ retry_on LegacyExponentialNamingError, wait: :exponentially_longer, attempts: 10, jitter: nil
+ end
+ end
+
+ travel_to Time.now
+ LegacyRetryJob.perform_later "LegacyExponentialNamingError", 5, :log_scheduled_at
+
+ assert_equal [
+ "Raised LegacyExponentialNamingError for the 1st time",
+ "Next execution scheduled at #{(Time.now + 3.seconds).to_f}",
+ "Raised LegacyExponentialNamingError for the 2nd time",
+ "Next execution scheduled at #{(Time.now + 18.seconds).to_f}",
+ "Raised LegacyExponentialNamingError for the 3rd time",
+ "Next execution scheduled at #{(Time.now + 83.seconds).to_f}",
+ "Raised LegacyExponentialNamingError for the 4th time",
+ "Next execution scheduled at #{(Time.now + 258.seconds).to_f}",
+ "Successfully completed job"
+ ], JobBuffer.values
+ end
+
private
def adapter_skips_scheduling?(queue_adapter)
[
diff --git a/activejob/test/cases/instrumentation_test.rb b/activejob/test/cases/instrumentation_test.rb
new file mode 100644
index 0000000000..50102061c1
--- /dev/null
+++ b/activejob/test/cases/instrumentation_test.rb
@@ -0,0 +1,52 @@
+# frozen_string_literal: true
+
+require "helper"
+require "jobs/hello_job"
+require "jobs/retry_job"
+
+class InstrumentationTest < ActiveSupport::TestCase
+ include ActiveJob::TestHelper
+
+ setup do
+ JobBuffer.clear
+ end
+
+ test "perform_now emits perform events" do
+ events = subscribed(/perform.*\.active_job/) { HelloJob.perform_now("World!") }
+ assert_equal 2, events.size
+ assert_equal "perform_start.active_job", events[0].first
+ assert_equal "perform.active_job", events[1].first
+ end
+
+ test "perform_later emits an enqueue event" do
+ events = subscribed("enqueue.active_job") { HelloJob.perform_later("World!") }
+ assert_equal 1, events.size
+ end
+
+ test "retry emits an enqueue retry event" do
+ events = subscribed("enqueue_retry.active_job") do
+ perform_enqueued_jobs { RetryJob.perform_later("DefaultsError", 2) }
+ end
+ assert_equal 1, events.size
+ end
+
+ test "retry exhaustion emits a retry_stopped event" do
+ events = subscribed("retry_stopped.active_job") do
+ perform_enqueued_jobs { RetryJob.perform_later("CustomCatchError", 6) }
+ end
+ assert_equal 1, events.size
+ end
+
+ test "discard emits a discard event" do
+ events = subscribed("discard.active_job") do
+ perform_enqueued_jobs { RetryJob.perform_later("DiscardableError", 2) }
+ end
+ assert_equal 1, events.size
+ end
+
+ def subscribed(name, &block)
+ [].tap do |events|
+ ActiveSupport::Notifications.subscribed(-> (*args) { events << args }, name, &block)
+ end
+ end
+end
diff --git a/activejob/test/cases/job_serialization_test.rb b/activejob/test/cases/job_serialization_test.rb
index 9f3cc6932a..f2b98958cd 100644
--- a/activejob/test/cases/job_serialization_test.rb
+++ b/activejob/test/cases/job_serialization_test.rb
@@ -65,14 +65,50 @@ class JobSerializationTest < ActiveSupport::TestCase
end
end
- test "serialize stores the enqueued_at time" do
- h1 = HelloJob.new
- type = h1.serialize["enqueued_at"].class
- assert_equal String, type
+ test "serializes and deserializes enqueued_at with full precision" do
+ freeze_time
- h2 = HelloJob.deserialize(h1.serialize)
- # We should be able to parse a timestamp
- type = Time.parse(h2.enqueued_at).class
- assert_equal Time, type
+ serialized = HelloJob.new.serialize
+ assert_kind_of String, serialized["enqueued_at"]
+
+ enqueued_at = HelloJob.deserialize(serialized).enqueued_at
+ assert_kind_of Time, enqueued_at
+ assert_equal Time.now.utc, enqueued_at
+ end
+
+ test "serializes and deserializes scheduled_at as Time" do
+ freeze_time
+ current_time = Time.now
+
+ job = HelloJob.new
+ job.scheduled_at = current_time
+ serialized_job = job.serialize
+ assert_kind_of String, serialized_job["enqueued_at"]
+ assert_equal current_time.utc.iso8601(9), serialized_job["enqueued_at"]
+
+ deserialized_job = HelloJob.new
+ deserialized_job.deserialize(serialized_job)
+ assert_equal current_time, deserialized_job.scheduled_at
+
+ assert_equal job.serialize, deserialized_job.serialize
+ end
+
+ test "deprecates and coerces numerical scheduled_at attribute to Time when serialized and deserialized" do
+ freeze_time
+ current_time = Time.now
+
+ job = HelloJob.new
+ assert_deprecated(ActiveJob.deprecator) do
+ job.scheduled_at = current_time.to_f
+ end
+
+ serialized_job = job.serialize
+ assert_kind_of String, serialized_job["scheduled_at"]
+ assert_equal current_time.utc.iso8601(9), serialized_job["scheduled_at"]
+
+ deserialized_job = HelloJob.new
+ deserialized_job.deserialize(serialized_job)
+ assert_equal current_time, deserialized_job.scheduled_at
+ assert_equal job.serialize, deserialized_job.serialize
end
end
diff --git a/activejob/test/cases/logging_test.rb b/activejob/test/cases/logging_test.rb
index d38c640f43..b888e9c91e 100644
--- a/activejob/test/cases/logging_test.rb
+++ b/activejob/test/cases/logging_test.rb
@@ -11,6 +11,7 @@
require "jobs/retry_job"
require "jobs/disable_log_job"
require "jobs/abort_before_enqueue_job"
+require "jobs/enqueue_error_job"
require "models/person"
class LoggingTest < ActiveSupport::TestCase
@@ -49,11 +50,9 @@ def set_logger(logger)
ActiveJob::Base.logger = logger
end
- def subscribed
+ def subscribed(&block)
[].tap do |events|
- ActiveSupport::Notifications.subscribed(-> (*args) { events << args }, /enqueue.*\.active_job/) do
- yield
- end
+ ActiveSupport::Notifications.subscribed(-> (*args) { events << args }, /enqueue.*\.active_job/, &block)
end
end
@@ -108,17 +107,17 @@ def test_globalid_nested_parameter_logging
def test_enqueue_job_logging
events = subscribed { HelloJob.perform_later "Cristian" }
assert_match(/Enqueued HelloJob \(Job ID: .*?\) to .*?:.*Cristian/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue.active_job")
+ assert_equal("enqueue.active_job", key)
end
def test_enqueue_job_log_error_when_callback_chain_is_halted
events = subscribed { AbortBeforeEnqueueJob.perform_later }
assert_match(/Failed enqueuing AbortBeforeEnqueueJob.* a before_enqueue callback halted/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue.active_job")
+ assert_equal("enqueue.active_job", key)
end
def test_enqueue_job_log_error_when_error_is_raised_during_callback_chain
@@ -129,9 +128,9 @@ def test_enqueue_job_log_error_when_error_is_raised_during_callback_chain
end
assert_match(/Failed enqueuing AbortBeforeEnqueueJob/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue.active_job")
+ assert_equal("enqueue.active_job", key)
end
def test_perform_job_logging
@@ -145,6 +144,15 @@ def test_perform_job_logging
end
end
+ def test_perform_job_logging_when_job_is_not_enqueued
+ perform_enqueued_jobs do
+ LoggingJob.perform_now "Dummy"
+
+ assert_match(/Performing LoggingJob \(Job ID: .*?\) from .*? with arguments:.*Dummy/, @logger.messages)
+ assert_no_match(/enqueued at /, @logger.messages)
+ end
+ end
+
def test_perform_job_log_error_when_callback_chain_is_halted
subscribed { AbortBeforeEnqueueJob.perform_now }
assert_match(/Error performing AbortBeforeEnqueueJob.* a before_perform callback halted/, @logger.messages)
@@ -208,9 +216,9 @@ def test_perform_nested_jobs_logging
def test_enqueue_at_job_logging
events = subscribed { HelloJob.set(wait_until: 24.hours.from_now).perform_later "Cristian" }
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue_at.active_job")
+ assert_equal("enqueue_at.active_job", key)
rescue NotImplementedError
skip
end
@@ -218,9 +226,9 @@ def test_enqueue_at_job_logging
def test_enqueue_at_job_log_error_when_callback_chain_is_halted
events = subscribed { AbortBeforeEnqueueJob.set(wait: 1.second).perform_later }
assert_match(/Failed enqueuing AbortBeforeEnqueueJob.* a before_enqueue callback halted/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue_at.active_job")
+ assert_equal("enqueue_at.active_job", key)
end
def test_enqueue_at_job_log_error_when_error_is_raised_during_callback_chain
@@ -231,21 +239,35 @@ def test_enqueue_at_job_log_error_when_error_is_raised_during_callback_chain
end
assert_match(/Failed enqueuing AbortBeforeEnqueueJob/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue_at.active_job")
+ assert_equal("enqueue_at.active_job", key)
end
def test_enqueue_in_job_logging
events = subscribed { HelloJob.set(wait: 2.seconds).perform_later "Cristian" }
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
- assert_equal(events.count, 1)
+ assert_equal(1, events.count)
key, * = events.first
- assert_equal(key, "enqueue_at.active_job")
+ assert_equal("enqueue_at.active_job", key)
rescue NotImplementedError
skip
end
+ def test_enqueue_log_when_enqueue_error_is_set
+ EnqueueErrorJob.disable_test_adapter
+
+ EnqueueErrorJob.perform_later
+ assert_match(/Failed enqueuing EnqueueErrorJob to EnqueueError\(default\): ActiveJob::EnqueueError \(There was an error enqueuing the job\)/, @logger.messages)
+ end
+
+ def test_enqueue_at_log_when_enqueue_error_is_set
+ EnqueueErrorJob.disable_test_adapter
+
+ EnqueueErrorJob.set(wait: 1.hour).perform_later
+ assert_match(/Failed enqueuing EnqueueErrorJob to EnqueueError\(default\): ActiveJob::EnqueueError \(There was an error enqueuing the job\)/, @logger.messages)
+ end
+
def test_for_tagged_logger_support_is_consistent
set_logger ::Logger.new(nil)
OverriddenLoggingJob.perform_later "Dummy"
@@ -260,37 +282,84 @@ def test_job_error_logging
end
end
+ def test_job_no_error_logging_on_rescuable_job
+ perform_enqueued_jobs { RescueJob.perform_later "david" }
+ assert_match(/Performing RescueJob \(Job ID: .*?\) from .*? with arguments:.*david/, @logger.messages)
+ assert_no_match(/Error performing RescueJob \(Job ID: .*?\) from .*? in .*ms: ArgumentError \(Hair too good\):\n.*\brescue_job\.rb:\d+:in `perform'/, @logger.messages)
+ end
+
def test_enqueue_retry_logging
perform_enqueued_jobs do
RetryJob.perform_later "DefaultsError", 2
- assert_match(/Retrying RetryJob in 3 seconds, due to a DefaultsError\./, @logger.messages)
+ assert_match(/Retrying RetryJob \(Job ID: .*?\) after \d+ attempts in 3 seconds, due to a DefaultsError.*\./, @logger.messages)
end
end
def test_enqueue_retry_logging_on_retry_job
perform_enqueued_jobs { RescueJob.perform_later "david" }
- assert_match(/Retrying RescueJob in 0 seconds\./, @logger.messages)
+ assert_match(/Retrying RescueJob \(Job ID: .*?\) after \d+ attempts in 0 seconds\./, @logger.messages)
end
def test_retry_stopped_logging
perform_enqueued_jobs do
RetryJob.perform_later "CustomCatchError", 6
- assert_match(/Stopped retrying RetryJob due to a CustomCatchError, which reoccurred on \d+ attempts\./, @logger.messages)
end
+ assert_match(/Stopped retrying RetryJob \(Job ID: .*?\) due to a CustomCatchError.*, which reoccurred on \d+ attempts\./, @logger.messages)
end
def test_retry_stopped_logging_without_block
perform_enqueued_jobs do
RetryJob.perform_later "DefaultsError", 6
rescue DefaultsError
- assert_match(/Stopped retrying RetryJob due to a DefaultsError, which reoccurred on \d+ attempts\./, @logger.messages)
+ assert_match(/Stopped retrying RetryJob \(Job ID: .*?\) due to a DefaultsError.*, which reoccurred on \d+ attempts\./, @logger.messages)
end
end
def test_discard_logging
perform_enqueued_jobs do
RetryJob.perform_later "DiscardableError", 2
- assert_match(/Discarded RetryJob due to a DiscardableError\./, @logger.messages)
+ assert_match(/Discarded RetryJob \(Job ID: .*?\) due to a DiscardableError.*\./, @logger.messages)
end
end
+
+ def test_enqueue_all_job_logging_some_jobs_failed_enqueuing
+ EnqueueErrorJob.disable_test_adapter
+
+ EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [false, true]
+
+ ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new)
+ assert_match(/Enqueued 1 job to .+ \(1 EnqueueErrorJob\)\. Failed enqueuing 1 job/, @logger.messages)
+ ensure
+ EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = []
+ end
+
+ def test_enqueue_all_job_logging_all_jobs_failed_enqueuing
+ EnqueueErrorJob.disable_test_adapter
+
+ EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [true, true]
+
+ ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new)
+ assert_match(/Failed enqueuing 2 jobs to .+/, @logger.messages)
+ ensure
+ EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = []
+ end
+
+ def test_verbose_enqueue_logs
+ ActiveJob.verbose_enqueue_logs = true
+
+ LoggingJob.perform_later "Dummy"
+ assert_match("↳", @logger.messages)
+ ensure
+ ActiveJob.verbose_enqueue_logs = false
+ end
+
+ def test_verbose_enqueue_logs_disabled_by_default
+ LoggingJob.perform_later "Dummy"
+ assert_no_match("↳", @logger.messages)
+ end
+
+ def test_enqueue_all_job_logging
+ ActiveJob.perform_all_later(LoggingJob.new("Dummy"), HelloJob.new("Jamie"), HelloJob.new("John"))
+ assert_match(/Enqueued 3 jobs to .+ \(2 HelloJob, 1 LoggingJob\)/, @logger.messages)
+ end
end
diff --git a/activejob/test/cases/queue_adapter_test.rb b/activejob/test/cases/queue_adapter_test.rb
index 3b5de604bc..ac09cdab73 100644
--- a/activejob/test/cases/queue_adapter_test.rb
+++ b/activejob/test/cases/queue_adapter_test.rb
@@ -48,6 +48,39 @@ class QueueAdapterTest < ActiveJob::TestCase
child_job_three = Class.new(ActiveJob::Base)
- assert_not_nil child_job_three.queue_adapter
+ assert_equal base_queue_adapter, child_job_three.queue_adapter, "child_job_three's queue adapter should remain unchanged"
+ end
+
+ test "should extract a reasonable name from a class instance" do
+ child_job = Class.new(ActiveJob::Base)
+ child_job.queue_adapter = ActiveJob::QueueAdapters::StubOneAdapter.new
+ assert_equal "stub_one", child_job.queue_adapter_name
+ end
+
+ module StubThreeAdapter
+ class << self
+ def enqueue(*); end
+ def enqueue_at(*); end
+ end
+ end
+
+ test "should extract a reasonable name from a class or module" do
+ child_job = Class.new(ActiveJob::Base)
+ child_job.queue_adapter = StubThreeAdapter
+ assert_equal "stub_three", child_job.queue_adapter_name
+ end
+
+ class StubFourAdapter
+ def enqueue(*); end
+ def enqueue_at(*); end
+ def queue_adapter_name
+ "fancy_name"
+ end
+ end
+
+ test "should use the name provided by the adapter" do
+ child_job = Class.new(ActiveJob::Base)
+ child_job.queue_adapter = StubFourAdapter.new
+ assert_equal "fancy_name", child_job.queue_adapter_name
end
end
diff --git a/activejob/test/cases/queue_naming_test.rb b/activejob/test/cases/queue_naming_test.rb
index 2b83a72d5c..943368b7c0 100644
--- a/activejob/test/cases/queue_naming_test.rb
+++ b/activejob/test/cases/queue_naming_test.rb
@@ -1,12 +1,17 @@
# frozen_string_literal: true
require "helper"
+require "jobs/configuration_job"
require "jobs/hello_job"
require "jobs/prefixed_job"
require "jobs/logging_job"
require "jobs/nested_job"
class QueueNamingTest < ActiveSupport::TestCase
+ setup do
+ JobBuffer.clear
+ end
+
test "name derived from base" do
assert_equal "default", HelloJob.new.queue_name
end
@@ -143,8 +148,15 @@ class QueueNamingTest < ActiveSupport::TestCase
end
end
- test "uses queue passed to #set" do
- job = HelloJob.set(queue: :some_queue).perform_later
+ test "is assigned when perform_now" do
+ ConfigurationJob.set(queue: :some_queue).perform_now
+ job = JobBuffer.last_value
+ assert_equal "some_queue", job.queue_name
+ end
+
+ test "is assigned when perform_later" do
+ ConfigurationJob.set(queue: :some_queue).perform_later
+ job = JobBuffer.last_value
assert_equal "some_queue", job.queue_name
end
end
diff --git a/activejob/test/cases/queue_priority_test.rb b/activejob/test/cases/queue_priority_test.rb
index 4b3006ae81..0751d24fc4 100644
--- a/activejob/test/cases/queue_priority_test.rb
+++ b/activejob/test/cases/queue_priority_test.rb
@@ -1,9 +1,14 @@
# frozen_string_literal: true
require "helper"
+require "jobs/configuration_job"
require "jobs/hello_job"
class QueuePriorityTest < ActiveSupport::TestCase
+ setup do
+ JobBuffer.clear
+ end
+
test "priority unset by default" do
assert_nil HelloJob.priority
end
@@ -42,8 +47,15 @@ class QueuePriorityTest < ActiveSupport::TestCase
end
end
- test "uses priority passed to #set" do
- job = HelloJob.set(priority: 123).perform_later
+ test "is assigned when perform_now" do
+ ConfigurationJob.set(priority: 123).perform_now
+ job = JobBuffer.last_value
+ assert_equal 123, job.priority
+ end
+
+ test "is assigned when perform_later" do
+ ConfigurationJob.set(priority: 123).perform_later
+ job = JobBuffer.last_value
assert_equal 123, job.priority
end
end
diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb
index e7bad83400..2413987df3 100644
--- a/activejob/test/cases/queuing_test.rb
+++ b/activejob/test/cases/queuing_test.rb
@@ -2,7 +2,10 @@
require "helper"
require "jobs/hello_job"
+require "jobs/enqueue_error_job"
+require "jobs/multiple_kwargs_job"
require "active_support/core_ext/numeric/time"
+require "minitest/mock"
class QueuingTest < ActiveSupport::TestCase
setup do
@@ -33,8 +36,72 @@ class QueuingTest < ActiveSupport::TestCase
test "job returned by perform_at has the timestamp available" do
job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later
- assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at
+ assert_equal Time.utc(2014, 1, 1), job.scheduled_at
rescue NotImplementedError
skip
end
+
+ test "job is yielded to block after enqueue with successfully_enqueued property set" do
+ HelloJob.perform_later "John" do |job|
+ assert_equal "John says hello", JobBuffer.last_value
+ assert_equal [ "John" ], job.arguments
+ assert_equal true, job.successfully_enqueued?
+ assert_nil job.enqueue_error
+ end
+ end
+
+ test "when enqueuing raises an EnqueueError job is yielded to block with error set on job" do
+ EnqueueErrorJob.perform_later do |job|
+ assert_equal false, job.successfully_enqueued?
+ assert_equal ActiveJob::EnqueueError, job.enqueue_error.class
+ end
+ end
+
+ test "run multiple queued jobs" do
+ ActiveJob.perform_all_later(HelloJob.new("Jamie"), HelloJob.new("John"))
+ assert_equal ["Jamie says hello", "John says hello"], JobBuffer.values.sort
+ end
+
+ test "run multiple queued jobs passed as array" do
+ ActiveJob.perform_all_later([HelloJob.new("Jamie"), HelloJob.new("John")])
+ assert_equal ["Jamie says hello", "John says hello"], JobBuffer.values.sort
+ end
+
+ test "run multiple queued jobs of different classes" do
+ ActiveJob.perform_all_later([HelloJob.new("Jamie"), MultipleKwargsJob.new(argument1: "John", argument2: 42)])
+ assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort
+ end
+
+ test "perform_all_later enqueues jobs with schedules" do
+ scheduled_job_1 = HelloJob.new("Scheduled 2014")
+ scheduled_job_1.set(wait_until: Time.utc(2014, 1, 1))
+
+ scheduled_job_2 = HelloJob.new("Scheduled 2015")
+ scheduled_job_2.scheduled_at = Time.utc(2015, 1, 1)
+
+ ActiveJob.perform_all_later(scheduled_job_1, scheduled_job_2)
+ assert_equal ["Scheduled 2014 says hello", "Scheduled 2015 says hello"], JobBuffer.values.sort
+ rescue NotImplementedError
+ skip
+ end
+
+ test "perform_all_later instrumentation" do
+ jobs = HelloJob.new("Jamie"), HelloJob.new("John")
+ called = false
+
+ subscriber = lambda do |*args|
+ called = true
+ event = ActiveSupport::Notifications::Event.new(*args)
+ payload = event.payload
+ assert payload[:adapter]
+ assert_equal jobs, payload[:jobs]
+ assert_equal 2, payload[:enqueued_count]
+ end
+
+ ActiveSupport::Notifications.subscribed(subscriber, "enqueue_all.active_job") do
+ ActiveJob.perform_all_later(jobs)
+ end
+
+ assert called
+ end
end
diff --git a/activejob/test/cases/rescue_test.rb b/activejob/test/cases/rescue_test.rb
index f4a6a3a065..56cdbd61a8 100644
--- a/activejob/test/cases/rescue_test.rb
+++ b/activejob/test/cases/rescue_test.rb
@@ -33,4 +33,9 @@ class RescueTest < ActiveSupport::TestCase
RescueJob.perform_later [Person.new(404)]
assert_includes JobBuffer.values, "DeserializationError original exception was Person::RecordNotFound"
end
+
+ test "rescue from exceptions that don't inherit from StandardError" do
+ RescueJob.perform_later("rafael")
+ assert_equal ["rescued from NotImplementedError"], JobBuffer.values
+ end
end
diff --git a/activejob/test/cases/serializers_test.rb b/activejob/test/cases/serializers_test.rb
index 2b9ba75ae0..cf18db81d1 100644
--- a/activejob/test/cases/serializers_test.rb
+++ b/activejob/test/cases/serializers_test.rb
@@ -77,7 +77,7 @@ def klass
)
end
- test "will deserialize know serialized objects" do
+ test "will deserialize known serialized objects" do
ActiveJob::Serializers.add_serializers DummySerializer
hash = { "_aj_serialized" => "SerializersTest::DummySerializer", "value" => 123 }
assert_equal DummyValueObject.new(123), ActiveJob::Serializers.deserialize(hash)
diff --git a/activejob/test/cases/test_case_test.rb b/activejob/test/cases/test_case_test.rb
index 4ae2add3a8..94d0e29f92 100644
--- a/activejob/test/cases/test_case_test.rb
+++ b/activejob/test/cases/test_case_test.rb
@@ -22,4 +22,8 @@ def test_include_helper
def test_set_test_adapter
assert_kind_of ActiveJob::QueueAdapters::TestAdapter, queue_adapter
end
+
+ def test_does_not_perform_enqueued_jobs_by_default
+ assert_nil queue_adapter.perform_enqueued_jobs
+ end
end
diff --git a/activejob/test/cases/test_helper_test.rb b/activejob/test/cases/test_helper_test.rb
index f7d5a4e3f6..f206b98ab1 100644
--- a/activejob/test/cases/test_helper_test.rb
+++ b/activejob/test/cases/test_helper_test.rb
@@ -3,11 +3,13 @@
require "helper"
require "active_support/core_ext/time"
require "active_support/core_ext/date"
+require "zeitwerk"
require "jobs/hello_job"
require "jobs/logging_job"
require "jobs/nested_job"
require "jobs/rescue_job"
require "jobs/raising_job"
+require "jobs/retry_job"
require "jobs/inherited_job"
require "jobs/multiple_kwargs_job"
require "models/person"
@@ -200,6 +202,18 @@ def test_assert_enqueued_jobs_with_queue_option
end
end
+ def test_assert_enqueued_job_with_priority_option
+ assert_enqueued_with(job: HelloJob, priority: 10) do
+ HelloJob.set(priority: 10).perform_later
+ end
+
+ assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_with(job: HelloJob, priority: 10) do
+ HelloJob.set(priority: 5).perform_later
+ end
+ end
+ end
+
def test_assert_enqueued_jobs_with_only_option_and_none_sent
error = assert_raise ActiveSupport::TestCase::Assertion do
assert_enqueued_jobs 1, only: HelloJob do
@@ -495,13 +509,25 @@ def test_assert_enqueued_with_with_no_block
assert_enqueued_with(job: LoggingJob, queue: "default")
end
+ def test_assert_enqueued_with_when_queue_name_is_symbol
+ assert_enqueued_with(job: LoggingJob, queue: :default) do
+ LoggingJob.set(wait_until: Date.tomorrow.noon).perform_later
+ end
+ end
+
+ def test_assert_no_enqueued_jobs_and_perform_now
+ assert_no_enqueued_jobs do
+ LoggingJob.perform_now(1, 2, 3, keyword: true)
+ end
+ end
+
def test_assert_enqueued_with_returns
job = assert_enqueued_with(job: LoggingJob) do
LoggingJob.set(wait_until: 5.minutes.from_now).perform_later(1, 2, 3, keyword: true)
end
assert_instance_of LoggingJob, job
- assert_in_delta 5.minutes.from_now, job.scheduled_at, 1
+ assert_in_delta 5.minutes.from_now.to_f, job.scheduled_at.to_f, 1
assert_equal "default", job.queue_name
assert_equal [1, 2, 3, { keyword: true }], job.arguments
end
@@ -511,7 +537,7 @@ def test_assert_enqueued_with_with_no_block_returns
job = assert_enqueued_with(job: LoggingJob)
assert_instance_of LoggingJob, job
- assert_in_delta 5.minutes.from_now, job.scheduled_at, 1
+ assert_in_delta 5.minutes.from_now.to_f, job.scheduled_at.to_f, 1
assert_equal "default", job.queue_name
assert_equal [1, 2, 3, { keyword: true }], job.arguments
end
@@ -685,8 +711,33 @@ def test_assert_enqueued_with_failure_with_global_id_args
HelloJob.perform_later(ricardo)
end
end
+
assert_match(/No enqueued job found with {:job=>HelloJob, :args=>\[#{wilma.inspect}\]}/, error.message)
- assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id=\"9\"\>\], :queue=>\"default\"}/, error.message)
+ assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id="9">\], :queue=>"default".*?}/, error.message)
+ end
+
+ def test_show_jobs_that_are_enqueued_when_job_is_not_queued_at_all
+ ricardo = Person.new(9)
+ wilma = Person.new(11)
+
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_with(job: MultipleKwargsJob, args: [wilma]) do
+ HelloJob.perform_later(ricardo)
+ end
+ end
+
+ assert_match(/No enqueued job found with {:job=>MultipleKwargsJob, :args=>\[#{wilma.inspect}\]}/, error.message)
+ assert_match(/No jobs of class MultipleKwargsJob were enqueued, job classes enqueued: HelloJob/, error.message)
+ end
+
+ def test_shows_no_jobs_enqueued_when_there_are_no_jobs
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_enqueued_with(job: HelloJob, args: []) do
+ end
+ end
+
+ assert_match(/No enqueued job found with {:job=>HelloJob, :args=>\[\]}/, error.message)
+ assert_match(/No jobs were enqueued/, error.message)
end
def test_assert_enqueued_with_failure_with_no_block_with_global_id_args
@@ -698,7 +749,7 @@ def test_assert_enqueued_with_failure_with_no_block_with_global_id_args
end
assert_match(/No enqueued job found with {:job=>HelloJob, :args=>\[#{wilma.inspect}\]}/, error.message)
- assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id=\"9\"\>\], :queue=>\"default\"}/, error.message)
+ assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id="9">\], :queue=>"default".*?}/, error.message)
end
def test_assert_enqueued_with_does_not_change_jobs_count
@@ -1740,6 +1791,12 @@ def test_assert_performed_with_without_block
assert_performed_with(job: NestedJob, queue: "default")
end
+ def test_assert_performed_with_when_queue_name_is_symbol
+ assert_performed_with(job: NestedJob, queue: :default) do
+ NestedJob.perform_later
+ end
+ end
+
def test_assert_performed_with_returns
job = assert_performed_with(job: LoggingJob, queue: "default") do
LoggingJob.perform_later(keyword: :sym)
@@ -1794,6 +1851,18 @@ def test_assert_performed_with_without_block_failure
end
end
+ def test_assert_performed_job_with_priority_option
+ assert_performed_with(job: HelloJob, priority: 10) do
+ HelloJob.set(priority: 10).perform_later
+ end
+
+ assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_with(job: HelloJob, priority: 10) do
+ HelloJob.set(priority: 5).perform_later
+ end
+ end
+ end
+
def test_assert_performed_with_with_at_option
assert_performed_with(job: HelloJob, at: Date.tomorrow.noon) do
HelloJob.set(wait_until: Date.tomorrow.noon).perform_later
@@ -1913,7 +1982,7 @@ def test_assert_performed_with_failure_with_global_id_args
end
end
assert_match(/No performed job found with {:job=>HelloJob, :args=>\[#{wilma.inspect}\]}/, error.message)
- assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id=\"9\"\>\], :queue=>\"default\"}/, error.message)
+ assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id="9">\], :queue=>"default".*?}/, error.message)
end
def test_assert_performed_with_without_block_failure_with_global_id_args
@@ -1926,7 +1995,29 @@ def test_assert_performed_with_without_block_failure_with_global_id_args
end
assert_match(/No performed job found with {:job=>HelloJob, :args=>\[#{wilma.inspect}\]}/, error.message)
- assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id=\"9\"\>\], :queue=>\"default\"}/, error.message)
+ assert_match(/Potential matches: {.*?:job=>HelloJob, :args=>\[#<Person.* @id="9">\], :queue=>"default".*?}/, error.message)
+ end
+
+ def test_assert_performed_says_no_jobs_performed
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_with(job: HelloJob, args: [])
+ end
+
+ assert_match(/No performed job found with {:job=>HelloJob, :args=>\[\]}/, error.message)
+ assert_match(/No jobs were performed/, error.message)
+ end
+
+ def test_assert_performed_when_not_matching_the_class_shows_alteratives
+ ricardo = Person.new(9)
+ wilma = Person.new(11)
+ HelloJob.perform_later(ricardo)
+ perform_enqueued_jobs
+ error = assert_raise ActiveSupport::TestCase::Assertion do
+ assert_performed_with(job: MultipleKwargsJob, args: [wilma])
+ end
+
+ assert_match(/No performed job found with {:job=>MultipleKwargsJob, :args=>\[#<Person.* @id=11>\]}/, error.message)
+ assert_match(/No jobs of class MultipleKwargsJob were performed, job classes performed: HelloJob/, error.message)
end
def test_assert_performed_with_does_not_change_jobs_count
@@ -1955,6 +2046,14 @@ def test_assert_performed_with_without_block_does_not_change_jobs_count
assert_equal 2, queue_adapter.performed_jobs.count
end
+ test "perform_enqueued_jobs doesn't raise if discard_on ActiveJob::DeserializationError" do
+ RetryJob.perform_later Person.new(404), 1
+
+ assert_nothing_raised do
+ perform_enqueued_jobs(only: RetryJob)
+ end
+ end
+
test "TestAdapter respect max attempts" do
perform_enqueued_jobs(only: RaisingJob) do
assert_raises(RaisingJob::MyError) do
@@ -1966,6 +2065,20 @@ def test_assert_performed_with_without_block_does_not_change_jobs_count
end
end
+class AdapterIsNotTestAdapterTest < ActiveJob::TestCase
+ def queue_adapter_for_test
+ ActiveJob::QueueAdapters::InlineAdapter.new
+ end
+
+ def test_perform_enqueued_jobs_just_yields
+ JobBuffer.clear
+ perform_enqueued_jobs do
+ HelloJob.perform_later("kevin")
+ end
+ assert_equal(1, JobBuffer.values.size)
+ end
+end
+
class OverrideQueueAdapterTest < ActiveJob::TestCase
class CustomQueueAdapter < ActiveJob::QueueAdapters::TestAdapter; end
@@ -1985,18 +2098,13 @@ def test_queue_adapter_is_test_adapter
end
class QueueAdapterJobTest < ActiveJob::TestCase
- def before_setup
- @original_autoload_paths = ActiveSupport::Dependencies.autoload_paths
- ActiveSupport::Dependencies.autoload_paths = %w(test/jobs)
- super
- end
-
- def after_teardown
- ActiveSupport::Dependencies.autoload_paths = @original_autoload_paths
- super
- end
-
def test_queue_adapter_is_test_adapter
- assert_instance_of ActiveJob::QueueAdapters::TestAdapter, QueueAdapterJob.queue_adapter
+ Zeitwerk.with_loader do |loader|
+ loader.push_dir("test/jobs")
+ loader.setup
+ assert_instance_of ActiveJob::QueueAdapters::TestAdapter, QueueAdapterJob.queue_adapter
+ ensure
+ loader.unload
+ end
end
end
diff --git a/activejob/test/helper.rb b/activejob/test/helper.rb
index d7925aacce..65baecf800 100644
--- a/activejob/test/helper.rb
+++ b/activejob/test/helper.rb
@@ -1,5 +1,8 @@
# frozen_string_literal: true
+require_relative "../../tools/test_common"
+
+require "active_support/testing/strict_warnings"
require "active_job"
require "support/job_buffer"
@@ -12,10 +15,11 @@
require "support/integration/helper"
else
ActiveJob::Base.logger = Logger.new(nil)
- ActiveJob::Base.skip_after_callbacks_if_terminated = true
require "adapters/#{@adapter}"
end
require "active_support/testing/autorun"
-require_relative "../../tools/test_common"
+def adapter_is?(*adapter_class_symbols)
+ adapter_class_symbols.map(&:to_s).include? ActiveJob::Base.queue_adapter_name
+end
diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb
index 933d310c16..d7a245a893 100644
--- a/activejob/test/integration/queuing_test.rb
+++ b/activejob/test/integration/queuing_test.rb
@@ -10,7 +10,7 @@ class QueuingTest < ActiveSupport::TestCase
test "should run jobs enqueued on a listening queue" do
TestJob.perform_later @id
wait_for_jobs_to_finish_for(5.seconds)
- assert job_executed
+ assert_job_executed
end
test "should not run jobs queued on a non-listening queue" do
@@ -21,7 +21,7 @@ class QueuingTest < ActiveSupport::TestCase
TestJob.queue_as :some_other_queue
TestJob.perform_later @id
wait_for_jobs_to_finish_for(2.seconds)
- assert_not job_executed
+ assert_job_not_executed
ensure
TestJob.queue_name = old_queue
end
@@ -62,7 +62,7 @@ class QueuingTest < ActiveSupport::TestCase
test "should not run job enqueued in the future" do
TestJob.set(wait: 10.minutes).perform_later @id
wait_for_jobs_to_finish_for(5.seconds)
- assert_not job_executed
+ assert_job_not_executed
rescue NotImplementedError
skip
end
@@ -70,21 +70,31 @@ class QueuingTest < ActiveSupport::TestCase
test "should run job enqueued in the future at the specified time" do
TestJob.set(wait: 5.seconds).perform_later @id
wait_for_jobs_to_finish_for(2.seconds)
- assert_not job_executed
+ assert_job_not_executed
wait_for_jobs_to_finish_for(10.seconds)
- assert job_executed
+ assert_job_executed
+ rescue NotImplementedError
+ skip
+ end
+
+ test "should run job bulk enqueued in the future at the specified time" do
+ ActiveJob.perform_all_later([TestJob.new(@id).set(wait: 5.seconds)])
+ wait_for_jobs_to_finish_for(2.seconds)
+ assert_job_not_executed
+ wait_for_jobs_to_finish_for(10.seconds)
+ assert_job_executed
rescue NotImplementedError
skip
end
test "should supply a provider_job_id when available for immediate jobs" do
- skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
+ skip unless adapter_is?(:async, :delayed_job, :sidekiq, :queue_classic)
test_job = TestJob.perform_later @id
assert test_job.provider_job_id, "Provider job id should be set by provider"
end
test "should supply a provider_job_id when available for delayed jobs" do
- skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
+ skip unless adapter_is?(:async, :delayed_job, :sidekiq, :queue_classic)
delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id
assert delayed_test_job.provider_job_id, "Provider job id should by set for delayed jobs by provider"
end
@@ -98,7 +108,7 @@ class QueuingTest < ActiveSupport::TestCase
TestJob.perform_later @id
wait_for_jobs_to_finish_for(5.seconds)
- assert job_executed
+ assert_job_executed
assert_equal "de", job_executed_in_locale
ensure
I18n.available_locales = [:en]
@@ -115,7 +125,7 @@ class QueuingTest < ActiveSupport::TestCase
TestJob.perform_later @id
wait_for_jobs_to_finish_for(5.seconds)
- assert job_executed
+ assert_job_executed
assert_equal "Hawaii", job_executed_in_timezone
ensure
Time.zone = current_zone
@@ -123,15 +133,15 @@ class QueuingTest < ActiveSupport::TestCase
end
test "should run job with higher priority first" do
- skip unless adapter_is?(:delayed_job, :que)
+ skip unless adapter_is?(:delayed_job)
wait_until = Time.now + 3.seconds
TestJob.set(wait_until: wait_until, priority: 20).perform_later "#{@id}.1"
TestJob.set(wait_until: wait_until, priority: 10).perform_later "#{@id}.2"
wait_for_jobs_to_finish_for(10.seconds)
- assert job_executed "#{@id}.1"
- assert job_executed "#{@id}.2"
- assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
+ assert_job_executed "#{@id}.1"
+ assert_job_executed "#{@id}.2"
+ assert_job_executed_before("#{@id}.2", "#{@id}.1")
end
test "should run job with higher priority first in Backburner" do
@@ -141,8 +151,21 @@ class QueuingTest < ActiveSupport::TestCase
TestJob.set(priority: 20).perform_later "#{@id}.1"
TestJob.set(priority: 10).perform_later "#{@id}.2"
wait_for_jobs_to_finish_for(10.seconds)
- assert job_executed "#{@id}.1"
- assert job_executed "#{@id}.2"
- assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
+ assert_job_executed "#{@id}.1"
+ assert_job_executed "#{@id}.2"
+ assert_job_executed_before("#{@id}.2", "#{@id}.1")
end
+
+ private
+ def assert_job_executed(id = @id)
+ assert job_executed(id), "Job #{id} was not executed"
+ end
+
+ def assert_job_not_executed(id = @id)
+ assert_not job_executed(id), "Job #{id} was executed"
+ end
+
+ def assert_job_executed_before(first_id, second_id)
+ assert job_executed_at(first_id) < job_executed_at(second_id), "Job #{first_id} was not executed before Job #{second_id}"
+ end
end
diff --git a/activejob/test/jobs/after_discard_retry_job.rb b/activejob/test/jobs/after_discard_retry_job.rb
new file mode 100644
index 0000000000..aeda9a27e7
--- /dev/null
+++ b/activejob/test/jobs/after_discard_retry_job.rb
@@ -0,0 +1,33 @@
+# frozen_string_literal: true
+
+require_relative "../support/job_buffer"
+require "active_support/core_ext/integer/inflections"
+
+class AfterDiscardRetryJob < ActiveJob::Base
+ class UnhandledError < StandardError; end
+ class DefaultsError < StandardError; end
+ class CustomCatchError < StandardError; end
+ class DiscardableError < StandardError; end
+ class CustomDiscardableError < StandardError; end
+ class AfterDiscardError < StandardError; end
+ class ChildAfterDiscardError < AfterDiscardError; end
+
+ retry_on DefaultsError
+ retry_on(CustomCatchError) { |job, error| JobBuffer.add("Dealt with a job that failed to retry in a custom way after #{job.arguments.second} attempts. Message: #{error.message}") }
+ retry_on(AfterDiscardError)
+ retry_on(ChildAfterDiscardError)
+
+ discard_on DiscardableError
+ discard_on(CustomDiscardableError) { |_job, error| JobBuffer.add("Dealt with a job that was discarded in a custom way. Message: #{error.message}") }
+
+ after_discard { |_job, error| JobBuffer.add("Ran after_discard for job. Message: #{error.message}") }
+
+ def perform(raising, attempts)
+ if executions < attempts
+ JobBuffer.add("Raised #{raising} for the #{executions.ordinalize} time")
+ raise raising.constantize
+ else
+ JobBuffer.add("Successfully completed job")
+ end
+ end
+end
diff --git a/activejob/test/jobs/arguments_round_trip_job.rb b/activejob/test/jobs/arguments_round_trip_job.rb
new file mode 100644
index 0000000000..19b241f08e
--- /dev/null
+++ b/activejob/test/jobs/arguments_round_trip_job.rb
@@ -0,0 +1,7 @@
+# frozen_string_literal: true
+
+class ArgumentsRoundTripJob < ActiveJob::Base
+ def perform(*arguments)
+ JobBuffer.add(arguments)
+ end
+end
diff --git a/activejob/test/jobs/configuration_job.rb b/activejob/test/jobs/configuration_job.rb
new file mode 100644
index 0000000000..40d007f024
--- /dev/null
+++ b/activejob/test/jobs/configuration_job.rb
@@ -0,0 +1,9 @@
+# frozen_string_literal: true
+
+require_relative "../support/job_buffer"
+
+class ConfigurationJob < ActiveJob::Base
+ def perform
+ JobBuffer.add(self)
+ end
+end
diff --git a/activejob/test/jobs/enqueue_error_job.rb b/activejob/test/jobs/enqueue_error_job.rb
new file mode 100644
index 0000000000..f6be4b0165
--- /dev/null
+++ b/activejob/test/jobs/enqueue_error_job.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+class EnqueueErrorJob < ActiveJob::Base
+ class EnqueueErrorAdapter
+ class << self
+ attr_accessor :should_raise_sequence
+ end
+ self.should_raise_sequence = []
+
+ def enqueue(*)
+ raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
+ end
+
+ def enqueue_at(*)
+ raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
+ end
+
+ private
+ def should_raise?
+ self.class.should_raise_sequence.empty? || self.class.should_raise_sequence.shift
+ end
+ end
+
+ self.queue_adapter = EnqueueErrorAdapter.new
+
+ def perform
+ raise "This should never be called"
+ end
+end
diff --git a/activejob/test/jobs/logging_job.rb b/activejob/test/jobs/logging_job.rb
index 4605fa6937..d4ec2142f5 100644
--- a/activejob/test/jobs/logging_job.rb
+++ b/activejob/test/jobs/logging_job.rb
@@ -1,8 +1,8 @@
# frozen_string_literal: true
class LoggingJob < ActiveJob::Base
- def perform(dummy)
- logger.info "Dummy, here is it: #{dummy}"
+ def perform(*dummy)
+ logger.info "Dummy, here is it: #{dummy.join(" ")}"
end
def job_id
diff --git a/activejob/test/jobs/rescue_job.rb b/activejob/test/jobs/rescue_job.rb
index 049a8d9adf..0053873995 100644
--- a/activejob/test/jobs/rescue_job.rb
+++ b/activejob/test/jobs/rescue_job.rb
@@ -18,12 +18,18 @@ class OtherError < StandardError; end
JobBuffer.add("DeserializationError original exception was #{e.cause.class.name}")
end
+ rescue_from(NotImplementedError) do
+ JobBuffer.add("rescued from NotImplementedError")
+ end
+
def perform(person = "david")
case person
when "david"
raise ArgumentError, "Hair too good"
when "other"
raise OtherError, "Bad hair"
+ when "rafael"
+ raise NotImplementedError, "Hair is just perfect"
else
JobBuffer.add("performed beautifully")
end
diff --git a/activejob/test/jobs/retry_job.rb b/activejob/test/jobs/retry_job.rb
index 3dfc6f02be..a1b5a0f13e 100644
--- a/activejob/test/jobs/retry_job.rb
+++ b/activejob/test/jobs/retry_job.rb
@@ -10,13 +10,14 @@ class FirstRetryableErrorOfTwo < StandardError; end
class SecondRetryableErrorOfTwo < StandardError; end
class LongWaitError < StandardError; end
class ShortWaitTenAttemptsError < StandardError; end
-class ExponentialWaitTenAttemptsError < StandardError; end
+class PolynomialWaitTenAttemptsError < StandardError; end
class CustomWaitTenAttemptsError < StandardError; end
class CustomCatchError < StandardError; end
class DiscardableError < StandardError; end
class FirstDiscardableErrorOfTwo < StandardError; end
class SecondDiscardableErrorOfTwo < StandardError; end
class CustomDiscardableError < StandardError; end
+class UnlimitedRetryError < StandardError; end
class RetryJob < ActiveJob::Base
retry_on DefaultsError
@@ -25,10 +26,11 @@ class RetryJob < ActiveJob::Base
retry_on FirstRetryableErrorOfTwo, SecondRetryableErrorOfTwo, attempts: 4
retry_on LongWaitError, wait: 1.hour, attempts: 10
retry_on ShortWaitTenAttemptsError, wait: 1.second, attempts: 10
- retry_on ExponentialWaitTenAttemptsError, wait: :exponentially_longer, attempts: 10
+ retry_on PolynomialWaitTenAttemptsError, wait: :polynomially_longer, attempts: 10
retry_on CustomWaitTenAttemptsError, wait: ->(executions) { executions * 2 }, attempts: 10
retry_on(CustomCatchError) { |job, error| JobBuffer.add("Dealt with a job that failed to retry in a custom way after #{job.arguments.second} attempts. Message: #{error.message}") }
retry_on(ActiveJob::DeserializationError) { |job, error| JobBuffer.add("Raised #{error.class} for the #{job.executions} time") }
+ retry_on UnlimitedRetryError, attempts: :unlimited
discard_on DiscardableError
discard_on FirstDiscardableErrorOfTwo, SecondDiscardableErrorOfTwo
@@ -36,7 +38,7 @@ class RetryJob < ActiveJob::Base
before_enqueue do |job|
if job.arguments.include?(:log_scheduled_at) && job.scheduled_at
- JobBuffer.add("Next execution scheduled at #{job.scheduled_at}")
+ JobBuffer.add("Next execution scheduled at #{job.scheduled_at.to_f}")
end
end
diff --git a/activejob/test/serializers/time_with_zone_serializer_test.rb b/activejob/test/serializers/time_with_zone_serializer_test.rb
new file mode 100644
index 0000000000..5e0f8b645a
--- /dev/null
+++ b/activejob/test/serializers/time_with_zone_serializer_test.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+require "helper"
+
+class TimeWithZoneSerializerTest < ActiveSupport::TestCase
+ test "#deserialize preserves serialized time zone" do
+ time_zone_before = Time.zone
+
+ Time.zone = "America/Los_Angeles"
+ time_with_zone = Time.parse("08:00").in_time_zone
+
+ assert_equal "America/Los_Angeles", time_with_zone.time_zone.tzinfo.name
+
+ serialized = ActiveJob::Serializers::TimeWithZoneSerializer.serialize(time_with_zone)
+ Time.zone = "Europe/London"
+ deserialized = ActiveJob::Serializers::TimeWithZoneSerializer.deserialize(serialized)
+
+ assert_equal "America/Los_Angeles", deserialized.time_zone.tzinfo.name
+ assert_equal time_with_zone, deserialized
+ ensure
+ Time.zone = time_zone_before
+ end
+end
diff --git a/activejob/test/support/integration/adapters/backburner.rb b/activejob/test/support/integration/adapters/backburner.rb
index 0c248dda01..19209d6270 100644
--- a/activejob/test/support/integration/adapters/backburner.rb
+++ b/activejob/test/support/integration/adapters/backburner.rb
@@ -9,7 +9,7 @@ def setup
end
unless can_run?
puts "Cannot run integration tests for backburner. To be able to run integration tests for backburner you need to install and start beanstalkd.\n"
- status = ENV["CI"] ? false : true
+ status = ENV["BUILDKITE"] ? false : true
exit status
end
end
diff --git a/activejob/test/support/integration/adapters/queue_classic.rb b/activejob/test/support/integration/adapters/queue_classic.rb
index 2b5375461a..eaaef88392 100644
--- a/activejob/test/support/integration/adapters/queue_classic.rb
+++ b/activejob/test/support/integration/adapters/queue_classic.rb
@@ -14,11 +14,22 @@ def clear_jobs
def start_workers
uri = URI.parse(ENV["QC_DATABASE_URL"])
+ host = uri.host
+ port = uri.port
user = uri.user || ENV["USER"]
pass = uri.password
db = uri.path[1..-1]
- %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -X -c 'drop database if exists "#{db}"' -U #{user} -t template1}
- %x{#{"PGPASSWORD=\"#{pass}\"" if pass} psql -X -c 'create database "#{db}"' -U #{user} -t template1}
+
+ psql = [].tap do |args|
+ args << "PGPASSWORD=\"#{pass}\"" if pass
+ args << "psql -X -U #{user} -t template1"
+ args << "-h #{host}" if host
+ args << "-p #{port}" if port
+ end.join(" ")
+
+ %x{#{psql} -c 'drop database if exists "#{db}"'}
+ %x{#{psql} -c 'create database "#{db}"'}
+
QC::Setup.create
QC.default_conn_adapter.disconnect
@@ -30,7 +41,7 @@ def start_workers
rescue PG::ConnectionBad
puts "Cannot run integration tests for queue_classic. To be able to run integration tests for queue_classic you need to install and start postgresql.\n"
- status = ENV["CI"] ? false : true
+ status = ENV["BUILDKITE"] ? false : true
exit status
end
diff --git a/activejob/test/support/integration/adapters/resque.rb b/activejob/test/support/integration/adapters/resque.rb
index cd129e72b2..d12c696c1b 100644
--- a/activejob/test/support/integration/adapters/resque.rb
+++ b/activejob/test/support/integration/adapters/resque.rb
@@ -3,11 +3,11 @@
module ResqueJobsManager
def setup
ActiveJob::Base.queue_adapter = :resque
- Resque.redis = Redis::Namespace.new "active_jobs_int_test", redis: Redis.new(url: ENV["REDIS_URL"] || "redis://127.0.0.1:6379/12", thread_safe: true)
+ Resque.redis = Redis::Namespace.new "active_jobs_int_test", redis: Redis.new(url: ENV["REDIS_URL"] || "redis://127.0.0.1:6379/12")
Resque.logger = Rails.logger
unless can_run?
puts "Cannot run integration tests for resque. To be able to run integration tests for resque you need to install and start redis.\n"
- status = ENV["CI"] ? false : true
+ status = ENV["BUILDKITE"] ? false : true
exit status
end
end
diff --git a/activejob/test/support/integration/adapters/sidekiq.rb b/activejob/test/support/integration/adapters/sidekiq.rb
index 775c588d73..5309391604 100644
--- a/activejob/test/support/integration/adapters/sidekiq.rb
+++ b/activejob/test/support/integration/adapters/sidekiq.rb
@@ -10,7 +10,7 @@ def setup
ActiveJob::Base.queue_adapter = :sidekiq
unless can_run?
puts "Cannot run integration tests for sidekiq. To be able to run integration tests for sidekiq you need to install and start redis.\n"
- status = ENV["CI"] ? false : true
+ status = ENV["BUILDKITE"] ? false : true
exit status
end
end
@@ -25,6 +25,7 @@ def start_workers
death_read, death_write = IO.pipe
@pid = fork do
+ Sidekiq.redis_pool.reload(&:close)
continue_read.close
death_write.close
@@ -36,7 +37,7 @@ def start_workers
$stderr.sync = true
logfile = Rails.root.join("log/sidekiq.log").to_s
- Sidekiq.logger = Sidekiq::Logger.new(logfile)
+ set_logger(Sidekiq::Logger.new(logfile))
self_read, self_write = IO.pipe
trap "TERM" do
@@ -53,7 +54,18 @@ def start_workers
require "sidekiq/cli"
require "sidekiq/launcher"
- if Sidekiq.respond_to?(:[]=)
+ if Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new("7")
+ config = Sidekiq.default_configuration
+ config.queues = ["integration_tests"]
+ config.concurrency = 1
+ config.average_scheduled_poll_interval = 0.5
+ config.merge!(
+ environment: "test",
+ timeout: 1,
+ poll_interval_average: 1
+ )
+ elsif Sidekiq.respond_to?(:[]=)
+ # Sidekiq 6.5
config = Sidekiq
config[:queues] = ["integration_tests"]
config[:environment] = "test"
@@ -64,12 +76,12 @@ def start_workers
queues: ["integration_tests"],
environment: "test",
concurrency: 1,
- timeout: 1
+ timeout: 1,
+ average_scheduled_poll_interval: 0.5,
+ poll_interval_average: 1
}
end
sidekiq = Sidekiq::Launcher.new(config)
- Sidekiq.average_scheduled_poll_interval = 0.5
- Sidekiq.options[:poll_interval_average] = 1
begin
sidekiq.run
continue_write.puts "started"
@@ -100,10 +112,22 @@ def stop_workers
def can_run?
begin
Sidekiq.redis(&:info)
- Sidekiq.logger = nil
- rescue
- return false
+ rescue => e
+ if e.class.to_s.include?("CannotConnectError")
+ return false
+ else
+ raise
+ end
end
+ set_logger(nil)
true
end
+
+ def set_logger(logger)
+ if Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new("7")
+ Sidekiq.default_configuration.logger = logger
+ else
+ Sidekiq.logger = logger
+ end
+ end
end
diff --git a/activejob/test/support/integration/adapters/sneakers.rb b/activejob/test/support/integration/adapters/sneakers.rb
index 89dc61ca28..add755b399 100644
--- a/activejob/test/support/integration/adapters/sneakers.rb
+++ b/activejob/test/support/integration/adapters/sneakers.rb
@@ -18,7 +18,7 @@ def setup
log: Rails.root.join("log/sneakers.log").to_s
unless can_run?
puts "Cannot run integration tests for sneakers. To be able to run integration tests for sneakers you need to install and start rabbitmq.\n"
- status = ENV["CI"] ? false : true
+ status = ENV["BUILDKITE"] ? false : true
exit status
end
end
@@ -31,7 +31,7 @@ def start_workers
@pid = fork do
queues = %w(integration_tests)
workers = queues.map do |q|
- worker_klass = "ActiveJobWorker" + Digest::MD5.hexdigest(q)
+ worker_klass = "ActiveJobWorker" + OpenSSL::Digest::MD5.hexdigest(q)
Sneakers.const_set(worker_klass, Class.new(ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper) do
from_queue q
end)
diff --git a/activejob/test/support/integration/helper.rb b/activejob/test/support/integration/helper.rb
index 960d1f8abd..58b917ac29 100644
--- a/activejob/test/support/integration/helper.rb
+++ b/activejob/test/support/integration/helper.rb
@@ -9,7 +9,7 @@
dummy_app_path = Dir.mktmpdir + "/dummy"
dummy_app_template = File.expand_path("dummy_app_template.rb", __dir__)
args = Rails::Generators::ARGVScrubber.new(["new", dummy_app_path, "--skip-gemfile", "--skip-bundle",
- "--skip-git", "--skip-spring", "-d", "sqlite3", "--skip-javascript", "--force", "--quiet",
+ "--skip-git", "-d", "sqlite3", "--skip-javascript", "--force", "--quiet",
"--template", dummy_app_template]).prepare!
Rails::Generators::AppGenerator.start args
diff --git a/activejob/test/support/integration/test_case_helpers.rb b/activejob/test/support/integration/test_case_helpers.rb
index d8931a74ed..69b8bc2672 100644
--- a/activejob/test/support/integration/test_case_helpers.rb
+++ b/activejob/test/support/integration/test_case_helpers.rb
@@ -27,10 +27,6 @@ def clear_jobs
jobs_manager.clear_jobs
end
- def adapter_is?(*adapter_class_symbols)
- adapter_class_symbols.map(&:to_s).include? ActiveJob::Base.queue_adapter_name
- end
-
def wait_for_jobs_to_finish_for(seconds = 60)
Timeout.timeout(seconds) do
while !job_executed do
diff --git a/activejob/test/support/job_buffer.rb b/activejob/test/support/job_buffer.rb
index 45a6437685..aa6c41592e 100644
--- a/activejob/test/support/job_buffer.rb
+++ b/activejob/test/support/job_buffer.rb
@@ -19,3 +19,9 @@ def last_value
end
end
end
+
+class ActiveSupport::TestCase
+ teardown do
+ JobBuffer.clear
+ end
+end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment