Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save reidmorrison/1356f5811de522a2495a to your computer and use it in GitHub Desktop.
Save reidmorrison/1356f5811de522a2495a to your computer and use it in GitHub Desktop.
Return a single row at a time from ActiveRecord without having to load them all into memory first
if defined?(Java)
class ActiveRecord::ConnectionAdapters::JdbcAdapter
# Returns each row as a hash
# Example:
# Inquiry.connection.exec_query_each('select * from inquiries') do |row|
# ap row
# end
def exec_query_each(sql, name = 'SQL', binds = [], &block)
log(sql, name) { @connection.exec_query_each(sql, binds, &block) }
end
end
end
if defined?(Java)
class ActiveRecord::ConnectionAdapters::JdbcConnection
def exec_query_each(sql, binds = [])
#class Java::ArjdbcJdbc::RubyJdbcConnection
#class ActiveRecord::ConnectionAdapters::JdbcConnection
#class Java::ArjdbcMysql::MySQLRubyJdbcConnection
#class ActiveRecord::ConnectionAdapters::MySQLJdbcConnection
begin
stmt = @connection.createStatement(java.sql::ResultSet::TYPE_FORWARD_ONLY, java.sql::ResultSet::CONCUR_READ_ONLY, java.sql::ResultSet::CLOSE_CURSORS_AT_COMMIT)
stmt.fetch_size = java.lang::Integer::MIN_VALUE
result_set = stmt.executeQuery(sql)
meta_data = result_set.get_meta_data
# We freeze the strings to prevent them getting duped when
# used as keys in ActiveRecord::Base's @attributes hash
# String name = resultMetaData.getColumnLabel(i);
# if ( downCase ) {
# name = name.toLowerCase();
# } else {
# name = caseConvertIdentifierForRails(connection, name);
# }
columns = (1..meta_data.columnCount).collect{|i| meta_data.getColumnName(i)}
while result_set.next do
# In the past we used Hash[columns.zip(row)]
# though elegant, the verbose way is much more efficient
# both time and memory wise cause it avoids a big array allocation
# this method is called a lot and needs to be micro optimised
hash = {}
index = 0
length = columns.size
while index < length
raw = result_set.getObject(index+1)
# Protected method in Java::ArjdbcJdbc::RubyJdbcConnection
# value = jdbcToRuby(index+1, meta_data.getColumnType(index+1), result_set)
hash[columns[index]] = raw
index += 1
end
yield hash
end
ensure
stmt.close if stmt
end
end
end
end
# MySQL2 Driver
unless defined?(Java)
module ActiveRecord
module ConnectionAdapters
class Mysql2Adapter
# Returns each row a hash
# Example:
# Inquiry.connection.exec_query_each('select * from inquiries') do |row|
# ap row
# end
def exec_query_each(sql, name = 'SQL', binds = [])
result = execute(sql, name)
# We freeze the strings to prevent them getting duped when
# used as keys in ActiveRecord::Base's @attributes hash
columns = result.fields.map { |c| c.dup.freeze }
result.each do |row|
# In the past we used Hash[columns.zip(row)]
# though elegant, the verbose way is much more efficient
# both time and memory wise cause it avoids a big array allocation
# this method is called a lot and needs to be micro optimised
hash = {}
index = 0
length = columns.length
while index < length
hash[columns[index]] = row[index]
index += 1
end
yield hash
end
end
end
end
end
end
class ActiveRecord::Relation
# Returns one hash at a time for every row returned from the database
#
# Note:
# This call is performed on a separate database connection outside the
# current transaction so that other database calls can be performed
# in the block
#
# Warning:
# At this time only the following types are handled:
# * String
# * Integer
# * Float
# * Date
# * Time
# All other types must be manually converted
#
# Example:
# Inquiry.all.each_hash {|row| ap row}
def each_hash(&block)
# In test it uses transactional fixtures
return each { |model| block.call(model.attributes) } if Rails.env.test?
# Use a new database connection so that other database calls can be performed
# in the block
# Uses the Slave connection if configured from ActiveRecordSlave
begin
pool = defined?(ActiveRecordSlave::Slave) ? ActiveRecordSlave::Slave.connection_pool : ActiveRecord::Base.connection_pool
conn = pool.checkout
if defined?(JRuby)
conn.exec_query_each(to_sql) do |row|
row.each_pair do |key, val|
case
when val.is_a?(Java::JavaSql::Date)
row[key] = Date.parse(val.to_s)
when val.is_a?(Java::JavaSql::Timestamp)
row[key] = Time.parse(val.to_s)
when val.is_a?(Java::JavaSql::Time)
# Assumes UTC
row[key] = Time.new(0, 0, 0, val.hours, val.minutes, val.seconds)
# else 'String', 'Fixnum', 'Float'
end
end
block.call(row)
end
else
conn.exec_query_each(to_sql, &block)
end
ensure
pool.checkin(conn) if conn
end
end
# Returns one Model instance at a time for every row returned from the database
#
# Note:
# This call is performed on a separate database connection outside the
# current transaction so that other database calls can be performed
# in the block
#
# Warning:
# At this time only the following types are handled:
# * String
# * Integer
# * Float
# All other types must be manually converted first by calling #each_hash
#
# Example:
# Inquiry.all.each_row {|inquiry| ap inquiry}
def each_row(&block)
each_hash { |row| block.call(model.instantiate(row)) }
end
end
@reidmorrison
Copy link
Author

We use this solution in production for handling very large batches where it is more efficient to only perform a single query.

This will work against MySQL databases in both MRI and JRuby. To add support for other databases, their connection adapters will need the method exec_query_each to be defined. This code should work as-is with any JDBC database when running JRuby.

Using find_in_batches results in the same SQL being called against the database every 1000 records with just different id ranges. Depending on the query and size of the database it could be very inefficient.
A new database connection is used for performing the query, with each record being returned from the database one at a time. Any code within the block will use the current database connection already assigned to the current thread.

This solution gives our batch processing environments the ability to scale well beyond the solution offered by ActiveRecord find_in_batches, or find_each.
In particular the use of each_hash avoids the overhead of creating an instance of the model which also helps with performance where applicable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment