Skip to content

Instantly share code, notes, and snippets.

@tonywok
Last active May 27, 2021 10:14
Show Gist options
  • Save tonywok/76498335b69d7f411a0fb8d06140e6f0 to your computer and use it in GitHub Desktop.
Save tonywok/76498335b69d7f411a0fb8d06140e6f0 to your computer and use it in GitHub Desktop.
PostgresSQL Async DB Adapter
gem "db"
gem "db-postgres"
gem "thread-local"
require "db"
require "db/postgres"
require "active_record/connection_adapters/postgresql_adapter"
module ActiveRecord
# NOTE: This is the active record connection entry point.
# This method is called internally based on the adapter specified in database.yml
module ConnectionHandling # :nodoc:
def postgresql_db_connection(config)
conn_params = config.symbolize_keys.compact
# Map database.yaml config keys to db-postgres config keys
conn_params[:user] = conn_params.delete(:username) if conn_params[:username]
ConnectionAdapters::PostgreSQLDbAdapter.new(
ConnectionAdapters::PostgreSQLDbAdapter.new_client(conn_params),
logger,
conn_params,
config,
)
end
end
module ConnectionAdapters
# NOTE: We need to ensure that each thread that AR spins up utilizes the same underlying DB client
# Without this,
module ThreadLocalClients
extend Thread::Local
def self.local
{}
end
def self.lookup(adapter, credentials)
key = [adapter, credentials]
self.instance[key] ||= DB::Client.new(adapter.new(**credentials))
end
end
# NOTE: We have to freedom patch the connection pool because it's currently pooling threads.
# We want each fiber to use its own connection
# There is some nuance here around transactions requiring sticky connections s.t fibers don't step on one another
class ConnectionPool
def connection_cache_key(_thread)
Fiber.current
end
end
# NOTE: Here we forward database interactions to db
# NOTE: Currently inheriting from PostgreSQLAdapter, but we may have to revisit this as it's super coupled to pg gem
# This will involve potentially adding necessary features to db and db-postgres, respectively.
class PostgreSQLDbAdapter < PostgreSQLAdapter
ADAPTER_NAME = "PostgreSQLDb"
class << self
def new_client(conn_params)
thread_local_client = ThreadLocalClients.lookup(DB::Postgres::Adapter, conn_params)
DbPostgresShim.new(thread_local_client)
end
end
class DbPostgresShim
NotImplemented = Class.new(StandardError)
attr_reader :client
def initialize(client)
@client = client
end
def query(statement)
session = client.session
result = nil
Sync do
result = session.call(statement)
ensure
session.close
end
result
end
def status
end
def reset
end
def transaction_status
end
def close
end
def socket_io
end
def server_version
query("SELECT VERSION()")
end
def exec_params(sql, type_casted_binds)
end
def exec_prepared(stmt_key, type_casted_binds)
end
def prepare(nextkey, sql)
end
def get_last_result
end
def set_client_encoding(encoding)
end
def type_map_for_queries=(map)
end
def type_map_for_results=(map)
end
def type_map_for_results
# returns something that responds to add_coder(@timestamp_decoder)
end
def method_missing(method, *args, &block)
raise(NotImplemented, "'#{method}' called with '#{args}'")
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment