Last active
October 17, 2023 09:33
-
-
Save radanskoric/48f1982f2fe80b7d3bb44680f6d292aa to your computer and use it in GitHub Desktop.
Rails 7.1 async database query execution key functionality recreated as plain Ruby, for better understanding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# == NOTE == | |
# This gist is accompnaying a blog post on the topic: https://radanskoric.com/articles/understand-rails-async-db-queries | |
require 'bundler/inline' | |
require "benchmark" | |
require "debug" | |
require "uri" | |
gemfile do | |
source 'https://rubygems.org' | |
gem 'activerecord' | |
gem 'pg' | |
end | |
require 'active_record' | |
ActiveRecord.async_query_executor = :global_thread_pool | |
# This setting controls number of threads in async executor pool. | |
# Default value is 4. Uncomment to change it. | |
# ActiveRecord.global_executor_concurrency = 4 | |
# To run this script, define DATABASE_URL env var to be a a connection url like: | |
# "postgresql://user:password@localhost/database_name" | |
db_config = URI.parse(ENV['DATABASE_URL']) | |
ActiveRecord::Base.establish_connection( | |
adapter: db_config.scheme, | |
host: db_config.host, | |
username: db_config.user, | |
password: db_config.password, | |
database: db_config.path[1..], | |
) | |
# I'm assuming that the database has a users table, as most do, but if you are trying to run | |
# this script on a database that doesn't have a users table, just rename it to matcha a table | |
# that exists. We're not actually doing anything with the records so any table will do. | |
class User < ActiveRecord::Base; | |
scope :slow, -> { | |
where("SELECT true FROM pg_sleep(1)").limit(1) | |
} | |
end | |
# This is for convenience if you do end up using something other than User, | |
# no need to change it after this line. | |
MODEL = User | |
class AsyncQueryLoader | |
# A very very stripped down version of ActiveRecord::FutureResult. | |
# Mainly, all the error handling is removed. Who needs that, right? | |
class FutureResult | |
# Everything except the db pool is just pass through to the low level execution method. | |
def initialize(database_connection_pool, *args, **kwargs) | |
@pool = database_connection_pool | |
@mutex = Mutex.new | |
@pending = true | |
@args = args | |
@kwargs = kwargs | |
end | |
def execute_or_skip | |
return unless @pending | |
@pool.with_connection do |connection| | |
# If someone already has the mutex they're executing the query so we | |
# don't need to do anything. | |
return unless @mutex.try_lock | |
begin | |
# Check again if it is pending in case it changed while we were | |
# entering the mutex. | |
execute_query(connection) if @pending | |
ensure | |
@mutex.unlock | |
end | |
end | |
end | |
def result | |
if @pending | |
# If query is currently being executed the executing thread will hold the mutex. | |
# So the way we actually wait for execution to finish is by | |
# waiting to enter the mutex. | |
@mutex.synchronize do | |
# Check again if it is pending in case it changed while we were | |
# entering the mutex. | |
execute_query(@pool.connection) if @pending | |
end | |
end | |
@result | |
end | |
private | |
def execute_query(connection) | |
@result = connection.internal_exec_query(*@args, **@kwargs) | |
ensure | |
@pending = false | |
end | |
end | |
def initialize | |
# These are the default settings that Rails will use. | |
@async_executor = Concurrent::ThreadPoolExecutor.new( | |
min_threads: 0, | |
max_threads: 4, # The default value if you don't set global_executor_concurrency | |
max_queue: 16, # Rails sets the queue at 4 x max_threads. | |
fallback_policy: :caller_runs # If queue is full, run it right away | |
) | |
end | |
def run_async(query) | |
connection = ActiveRecord::Base.connection | |
sql, binds, _preparable = connection.send(:to_sql_and_binds, query.arel) | |
future_result = FutureResult.new(connection.pool, sql, "Name", binds, prepare: false) | |
@async_executor.post { future_result.execute_or_skip } | |
future_result | |
end | |
end | |
# If you imagine that you have one of these loader objects per PROCESS, | |
# that will be an accurate approximation of what Rails does. | |
async_loader = AsyncQueryLoader.new | |
# We run two queries in parallel but obeserve that it took just a bit | |
# above 1 second, since it's running them in parallel. | |
res = Benchmark.measure do | |
# Try increasing the number of queries: | |
# - at 4 you will see that it still takes around 1 second, using all threads from the executor pool. | |
# - at 5 it's still around 1 second. Can you see why? :) (Hint: there's more than 4 threads running in total) | |
# - at 6 it will go to 2 seconds as the last query needs to wait for others to finish. | |
2.times | |
.map { User.slow } | |
.map { |query| async_loader.run_async(query) } | |
.each { |future_result| future_result.result } | |
end | |
puts res | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment