Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save rmacklin/686a3d284147c009e305527f32abfccd to your computer and use it in GitHub Desktop.
Save rmacklin/686a3d284147c009e305527f32abfccd to your computer and use it in GitHub Desktop.

Backporting Rails 6 parallel testing to Rails 5

How this patch was created

I started from the PR that added parallel testing support to Rails: https://github.com/rails/rails/pull/31900/files

I opened each of the files and then viewed the latest versions of those files ("latest" meaning when rails/master pointed at https://github.com/rails/rails/commit/52125dc0f8669d8dd497427c7b177d5d04106e0c).

For brand new files, I just copied the code. For modifications to existing files, I diffed those files between 5-2-stable and master to see if there were other important changes that are required by the parallel testing code. Fortunately, this was only the case for ActiveRecord::TestDatabases, which made use of the configs_for method that was introduced in this refactoring: https://github.com/rails/rails/commit/fdf3f0b9306ba8145e6e3acb84a50e5d23dfe48c More fortunately, the implementation in Eileen's original PR did not require those changes, and that implementation was sufficient for my application.

Note that I had to override the entire ActiveRecord::TestFixtures#setup_fixtures method just to patch in the two if lock_threads guard clauses that Eileen added. Not pretty, but it works (and I don't think that specific behavior can be patched more granularly).

How I use this patch

In config/application.rb I added a line, after we require rails, to require the parallel_testing_backport patch from lib:

require 'rails/all'
require_relative '../lib/parallel_testing_backport'

See the complete sample rails 5.2 app here: https://github.com/rmacklin/rails_parallel_testing_experiments/tree/rails-5

Why is this useful?

For me, this is useful to speed up running our tests before we upgrade our apps to Rails 6. I hope to follow in GitHub and Shopify's footsteps and get to the point where we can continuously upgrade our apps to rails/master, but we're not there yet, and this was useful in the meantime.

Should this be turned into a gem?

I thought about it, and currently I think a gem feels unnecessary. This is a temporary patch that will go away as soon as an application is upgraded to Rails 6. Additionally, the process for putting this patch into an existing Rails 5 app is copying a single file and requireing it. If the process was more involved than that, a gem might provide enough value.

That said, I'm still open to the idea.

unless Rails::VERSION::MAJOR == 5
raise 'This patch backports Rails 6 parallel test support into Rails 5. '\
'It should be removed after the app has been upgraded to Rails 6.'
end
require "concurrent/utility/processor_counter"
require "drb"
require "drb/unix" unless Gem.win_platform?
require "active_support/core_ext/module/attribute_accessors"
module ActiveSupport
module Testing
class Parallelization # :nodoc:
class Server
include DRb::DRbUndumped
def initialize
@queue = Queue.new
end
def record(reporter, result)
raise DRb::DRbConnError if result.is_a?(DRb::DRbUnknown)
reporter.synchronize do
reporter.record(result)
end
end
def <<(o)
o[2] = DRbObject.new(o[2]) if o
@queue << o
end
def length
@queue.length
end
def pop; @queue.pop; end
end
@@after_fork_hooks = []
def self.after_fork_hook(&blk)
@@after_fork_hooks << blk
end
cattr_reader :after_fork_hooks
@@run_cleanup_hooks = []
def self.run_cleanup_hook(&blk)
@@run_cleanup_hooks << blk
end
cattr_reader :run_cleanup_hooks
def initialize(queue_size)
@queue_size = queue_size
@queue = Server.new
@pool = []
@url = DRb.start_service("drbunix:", @queue).uri
end
def after_fork(worker)
self.class.after_fork_hooks.each do |cb|
cb.call(worker)
end
end
def run_cleanup(worker)
self.class.run_cleanup_hooks.each do |cb|
cb.call(worker)
end
end
def start
@pool = @queue_size.times.map do |worker|
title = "Rails test worker #{worker}"
fork do
Process.setproctitle("#{title} - (starting)")
DRb.stop_service
begin
after_fork(worker)
rescue => setup_exception; end
queue = DRbObject.new_with_uri(@url)
while job = queue.pop
klass = job[0]
method = job[1]
reporter = job[2]
Process.setproctitle("#{title} - #{klass}##{method}")
result = klass.with_info_handler reporter do
Minitest.run_one_method(klass, method)
end
add_setup_exception(result, setup_exception) if setup_exception
begin
queue.record(reporter, result)
rescue DRb::DRbConnError
result.failures.map! do |failure|
if failure.respond_to?(:error)
# minitest >5.14.0
error = DRb::DRbRemoteError.new(failure.error)
else
error = DRb::DRbRemoteError.new(failure.exception)
end
Minitest::UnexpectedError.new(error)
end
queue.record(reporter, result)
end
Process.setproctitle("#{title} - (idle)")
end
ensure
Process.setproctitle("#{title} - (stopping)")
run_cleanup(worker)
end
end
end
def <<(work)
@queue << work
end
def shutdown
@queue_size.times { @queue << nil }
@pool.each { |pid| Process.waitpid pid }
if @queue.length > 0
raise "Queue not empty, but all workers have finished. This probably means that a worker crashed and #{@queue.length} tests were missed."
end
end
private
def add_setup_exception(result, setup_exception)
result.failures.prepend Minitest::UnexpectedError.new(setup_exception)
end
end
end
end
module ActiveRecord
module TestDatabases # :nodoc:
ActiveSupport::Testing::Parallelization.after_fork_hook do |i|
create_and_load_schema(i, env_name: Rails.env)
end
ActiveSupport::Testing::Parallelization.run_cleanup_hook do
drop(env_name: Rails.env)
end
def self.create_and_load_schema(i, env_name:)
old, ENV["VERBOSE"] = ENV["VERBOSE"], "false"
connection_spec = ActiveRecord::Base.configurations[env_name]
connection_spec["database"] += "-#{i}"
ActiveRecord::Tasks::DatabaseTasks.create(connection_spec)
ActiveRecord::Tasks::DatabaseTasks.load_schema(connection_spec)
ensure
ActiveRecord::Base.establish_connection(Rails.env.to_sym)
ENV["VERBOSE"] = old
end
def self.drop(env_name:)
old, ENV["VERBOSE"] = ENV["VERBOSE"], "false"
connection_spec = ActiveRecord::Base.configurations[env_name]
ActiveRecord::Tasks::DatabaseTasks.drop(connection_spec)
ensure
ENV["VERBOSE"] = old
end
end
end
module ActiveRecord
module TestFixturesExtension
extend ActiveSupport::Concern
included do
class_attribute :lock_threads, default: true
end
end
module TestFixtures
def setup_fixtures(config = ActiveRecord::Base)
if pre_loaded_fixtures && !use_transactional_tests
raise RuntimeError, "pre_loaded_fixtures requires use_transactional_tests"
end
@fixture_cache = {}
@fixture_connections = []
@@already_loaded_fixtures ||= {}
@connection_subscriber = nil
# Load fixtures once and begin transaction.
if run_in_transaction?
if @@already_loaded_fixtures[self.class]
@loaded_fixtures = @@already_loaded_fixtures[self.class]
else
@loaded_fixtures = load_fixtures(config)
@@already_loaded_fixtures[self.class] = @loaded_fixtures
end
# Begin transactions for connections already established
@fixture_connections = enlist_fixture_connections
@fixture_connections.each do |connection|
connection.begin_transaction joinable: false
connection.pool.lock_thread = true if lock_threads
end
# When connections are established in the future, begin a transaction too
@connection_subscriber = ActiveSupport::Notifications.subscribe("!connection.active_record") do |_, _, _, _, payload|
spec_name = payload[:spec_name] if payload.key?(:spec_name)
if spec_name
begin
connection = ActiveRecord::Base.connection_handler.retrieve_connection(spec_name)
rescue ConnectionNotEstablished
connection = nil
end
if connection && !@fixture_connections.include?(connection)
connection.begin_transaction joinable: false
connection.pool.lock_thread = true if lock_threads
@fixture_connections << connection
end
end
end
# Load fixtures for every test.
else
ActiveRecord::FixtureSet.reset_cache
@@already_loaded_fixtures[self.class] = nil
@loaded_fixtures = load_fixtures(config)
end
# Instantiate fixtures for every test if requested.
instantiate_fixtures if use_instantiated_fixtures
end
end
end
module ActiveSupport
class TestCase
include ActiveRecord::TestDatabases
include ActiveRecord::TestFixturesExtension
class << self
# Parallelizes the test suite.
#
# Takes a +workers+ argument that controls how many times the process
# is forked. For each process a new database will be created suffixed
# with the worker number.
#
# test-database-0
# test-database-1
#
# If <tt>ENV["PARALLEL_WORKERS"]</tt> is set the workers argument will be ignored
# and the environment variable will be used instead. This is useful for CI
# environments, or other environments where you may need more workers than
# you do for local testing.
#
# If the number of workers is set to +1+ or fewer, the tests will not be
# parallelized.
#
# If +workers+ is set to +:number_of_processors+, the number of workers will be
# set to the actual core count on the machine you are on.
#
# The default parallelization method is to fork processes. If you'd like to
# use threads instead you can pass <tt>with: :threads</tt> to the +parallelize+
# method. Note the threaded parallelization does not create multiple
# database and will not work with system tests at this time.
#
# parallelize(workers: :number_of_processors, with: :threads)
#
# The threaded parallelization uses minitest's parallel executor directly.
# The processes parallelization uses a Ruby DRb server.
def parallelize(workers: :number_of_processors, with: :processes)
workers = Concurrent.physical_processor_count if workers == :number_of_processors
workers = ENV["PARALLEL_WORKERS"].to_i if ENV["PARALLEL_WORKERS"]
return if workers <= 1
executor = case with
when :processes
Testing::Parallelization.new(workers)
when :threads
Minitest::Parallel::Executor.new(workers)
else
raise ArgumentError, "#{with} is not a supported parallelization executor."
end
self.lock_threads = false if defined?(self.lock_threads) && with == :threads
Minitest.parallel_executor = executor
parallelize_me!
end
# Set up hook for parallel testing. This can be used if you have multiple
# databases or any behavior that needs to be run after the process is forked
# but before the tests run.
#
# Note: this feature is not available with the threaded parallelization.
#
# In your +test_helper.rb+ add the following:
#
# class ActiveSupport::TestCase
# parallelize_setup do
# # create databases
# end
# end
def parallelize_setup(&block)
ActiveSupport::Testing::Parallelization.after_fork_hook do |worker|
yield worker
end
end
# Clean up hook for parallel testing. This can be used to drop databases
# if your app uses multiple write/read databases or other clean up before
# the tests finish. This runs before the forked process is closed.
#
# Note: this feature is not available with the threaded parallelization.
#
# In your +test_helper.rb+ add the following:
#
# class ActiveSupport::TestCase
# parallelize_teardown do
# # drop databases
# end
# end
def parallelize_teardown(&block)
ActiveSupport::Testing::Parallelization.run_cleanup_hook do |worker|
yield worker
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment