Skip to content

Instantly share code, notes, and snippets.

@radanskoric
Last active October 17, 2023 09:33
Show Gist options
  • Save radanskoric/48f1982f2fe80b7d3bb44680f6d292aa to your computer and use it in GitHub Desktop.
Save radanskoric/48f1982f2fe80b7d3bb44680f6d292aa to your computer and use it in GitHub Desktop.
Rails 7.1 async database query execution key functionality recreated as plain Ruby, for better understanding
# == NOTE ==
# This gist is accompnaying a blog post on the topic: https://radanskoric.com/articles/understand-rails-async-db-queries
require 'bundler/inline'
require "benchmark"
require "debug"
require "uri"
gemfile do
source 'https://rubygems.org'
gem 'activerecord'
gem 'pg'
end
require 'active_record'
ActiveRecord.async_query_executor = :global_thread_pool
# This setting controls number of threads in async executor pool.
# Default value is 4. Uncomment to change it.
# ActiveRecord.global_executor_concurrency = 4
# To run this script, define DATABASE_URL env var to be a a connection url like:
# "postgresql://user:password@localhost/database_name"
db_config = URI.parse(ENV['DATABASE_URL'])
ActiveRecord::Base.establish_connection(
adapter: db_config.scheme,
host: db_config.host,
username: db_config.user,
password: db_config.password,
database: db_config.path[1..],
)
# I'm assuming that the database has a users table, as most do, but if you are trying to run
# this script on a database that doesn't have a users table, just rename it to matcha a table
# that exists. We're not actually doing anything with the records so any table will do.
class User < ActiveRecord::Base;
scope :slow, -> {
where("SELECT true FROM pg_sleep(1)").limit(1)
}
end
# This is for convenience if you do end up using something other than User,
# no need to change it after this line.
MODEL = User
class AsyncQueryLoader
# A very very stripped down version of ActiveRecord::FutureResult.
# Mainly, all the error handling is removed. Who needs that, right?
class FutureResult
# Everything except the db pool is just pass through to the low level execution method.
def initialize(database_connection_pool, *args, **kwargs)
@pool = database_connection_pool
@mutex = Mutex.new
@pending = true
@args = args
@kwargs = kwargs
end
def execute_or_skip
return unless @pending
@pool.with_connection do |connection|
# If someone already has the mutex they're executing the query so we
# don't need to do anything.
return unless @mutex.try_lock
begin
# Check again if it is pending in case it changed while we were
# entering the mutex.
execute_query(connection) if @pending
ensure
@mutex.unlock
end
end
end
def result
if @pending
# If query is currently being executed the executing thread will hold the mutex.
# So the way we actually wait for execution to finish is by
# waiting to enter the mutex.
@mutex.synchronize do
# Check again if it is pending in case it changed while we were
# entering the mutex.
execute_query(@pool.connection) if @pending
end
end
@result
end
private
def execute_query(connection)
@result = connection.internal_exec_query(*@args, **@kwargs)
ensure
@pending = false
end
end
def initialize
# These are the default settings that Rails will use.
@async_executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 0,
max_threads: 4, # The default value if you don't set global_executor_concurrency
max_queue: 16, # Rails sets the queue at 4 x max_threads.
fallback_policy: :caller_runs # If queue is full, run it right away
)
end
def run_async(query)
connection = ActiveRecord::Base.connection
sql, binds, _preparable = connection.send(:to_sql_and_binds, query.arel)
future_result = FutureResult.new(connection.pool, sql, "Name", binds, prepare: false)
@async_executor.post { future_result.execute_or_skip }
future_result
end
end
# If you imagine that you have one of these loader objects per PROCESS,
# that will be an accurate approximation of what Rails does.
async_loader = AsyncQueryLoader.new
# We run two queries in parallel but obeserve that it took just a bit
# above 1 second, since it's running them in parallel.
res = Benchmark.measure do
# Try increasing the number of queries:
# - at 4 you will see that it still takes around 1 second, using all threads from the executor pool.
# - at 5 it's still around 1 second. Can you see why? :) (Hint: there's more than 4 threads running in total)
# - at 6 it will go to 2 seconds as the last query needs to wait for others to finish.
2.times
.map { User.slow }
.map { |query| async_loader.run_async(query) }
.each { |future_result| future_result.result }
end
puts res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment