Created
November 13, 2014 20:06
-
-
Save reidmorrison/1356f5811de522a2495a to your computer and use it in GitHub Desktop.
Return a single row at a time from ActiveRecord without having to load them all into memory first
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if defined?(Java) | |
class ActiveRecord::ConnectionAdapters::JdbcAdapter | |
# Returns each row as a hash | |
# Example: | |
# Inquiry.connection.exec_query_each('select * from inquiries') do |row| | |
# ap row | |
# end | |
def exec_query_each(sql, name = 'SQL', binds = [], &block) | |
log(sql, name) { @connection.exec_query_each(sql, binds, &block) } | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if defined?(Java) | |
class ActiveRecord::ConnectionAdapters::JdbcConnection | |
def exec_query_each(sql, binds = []) | |
#class Java::ArjdbcJdbc::RubyJdbcConnection | |
#class ActiveRecord::ConnectionAdapters::JdbcConnection | |
#class Java::ArjdbcMysql::MySQLRubyJdbcConnection | |
#class ActiveRecord::ConnectionAdapters::MySQLJdbcConnection | |
begin | |
stmt = @connection.createStatement(java.sql::ResultSet::TYPE_FORWARD_ONLY, java.sql::ResultSet::CONCUR_READ_ONLY, java.sql::ResultSet::CLOSE_CURSORS_AT_COMMIT) | |
stmt.fetch_size = java.lang::Integer::MIN_VALUE | |
result_set = stmt.executeQuery(sql) | |
meta_data = result_set.get_meta_data | |
# We freeze the strings to prevent them getting duped when | |
# used as keys in ActiveRecord::Base's @attributes hash | |
# String name = resultMetaData.getColumnLabel(i); | |
# if ( downCase ) { | |
# name = name.toLowerCase(); | |
# } else { | |
# name = caseConvertIdentifierForRails(connection, name); | |
# } | |
columns = (1..meta_data.columnCount).collect{|i| meta_data.getColumnName(i)} | |
while result_set.next do | |
# In the past we used Hash[columns.zip(row)] | |
# though elegant, the verbose way is much more efficient | |
# both time and memory wise cause it avoids a big array allocation | |
# this method is called a lot and needs to be micro optimised | |
hash = {} | |
index = 0 | |
length = columns.size | |
while index < length | |
raw = result_set.getObject(index+1) | |
# Protected method in Java::ArjdbcJdbc::RubyJdbcConnection | |
# value = jdbcToRuby(index+1, meta_data.getColumnType(index+1), result_set) | |
hash[columns[index]] = raw | |
index += 1 | |
end | |
yield hash | |
end | |
ensure | |
stmt.close if stmt | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MySQL2 Driver | |
unless defined?(Java) | |
module ActiveRecord | |
module ConnectionAdapters | |
class Mysql2Adapter | |
# Returns each row a hash | |
# Example: | |
# Inquiry.connection.exec_query_each('select * from inquiries') do |row| | |
# ap row | |
# end | |
def exec_query_each(sql, name = 'SQL', binds = []) | |
result = execute(sql, name) | |
# We freeze the strings to prevent them getting duped when | |
# used as keys in ActiveRecord::Base's @attributes hash | |
columns = result.fields.map { |c| c.dup.freeze } | |
result.each do |row| | |
# In the past we used Hash[columns.zip(row)] | |
# though elegant, the verbose way is much more efficient | |
# both time and memory wise cause it avoids a big array allocation | |
# this method is called a lot and needs to be micro optimised | |
hash = {} | |
index = 0 | |
length = columns.length | |
while index < length | |
hash[columns[index]] = row[index] | |
index += 1 | |
end | |
yield hash | |
end | |
end | |
end | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ActiveRecord::Relation | |
# Returns one hash at a time for every row returned from the database | |
# | |
# Note: | |
# This call is performed on a separate database connection outside the | |
# current transaction so that other database calls can be performed | |
# in the block | |
# | |
# Warning: | |
# At this time only the following types are handled: | |
# * String | |
# * Integer | |
# * Float | |
# * Date | |
# * Time | |
# All other types must be manually converted | |
# | |
# Example: | |
# Inquiry.all.each_hash {|row| ap row} | |
def each_hash(&block) | |
# In test it uses transactional fixtures | |
return each { |model| block.call(model.attributes) } if Rails.env.test? | |
# Use a new database connection so that other database calls can be performed | |
# in the block | |
# Uses the Slave connection if configured from ActiveRecordSlave | |
begin | |
pool = defined?(ActiveRecordSlave::Slave) ? ActiveRecordSlave::Slave.connection_pool : ActiveRecord::Base.connection_pool | |
conn = pool.checkout | |
if defined?(JRuby) | |
conn.exec_query_each(to_sql) do |row| | |
row.each_pair do |key, val| | |
case | |
when val.is_a?(Java::JavaSql::Date) | |
row[key] = Date.parse(val.to_s) | |
when val.is_a?(Java::JavaSql::Timestamp) | |
row[key] = Time.parse(val.to_s) | |
when val.is_a?(Java::JavaSql::Time) | |
# Assumes UTC | |
row[key] = Time.new(0, 0, 0, val.hours, val.minutes, val.seconds) | |
# else 'String', 'Fixnum', 'Float' | |
end | |
end | |
block.call(row) | |
end | |
else | |
conn.exec_query_each(to_sql, &block) | |
end | |
ensure | |
pool.checkin(conn) if conn | |
end | |
end | |
# Returns one Model instance at a time for every row returned from the database | |
# | |
# Note: | |
# This call is performed on a separate database connection outside the | |
# current transaction so that other database calls can be performed | |
# in the block | |
# | |
# Warning: | |
# At this time only the following types are handled: | |
# * String | |
# * Integer | |
# * Float | |
# All other types must be manually converted first by calling #each_hash | |
# | |
# Example: | |
# Inquiry.all.each_row {|inquiry| ap inquiry} | |
def each_row(&block) | |
each_hash { |row| block.call(model.instantiate(row)) } | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
We use this solution in production for handling very large batches where it is more efficient to only perform a single query.
This will work against MySQL databases in both MRI and JRuby. To add support for other databases, their connection adapters will need the method
exec_query_each
to be defined. This code should work as-is with any JDBC database when running JRuby.Using
find_in_batches
results in the same SQL being called against the database every 1000 records with just differentid
ranges. Depending on the query and size of the database it could be very inefficient.A new database connection is used for performing the query, with each record being returned from the database one at a time. Any code within the block will use the current database connection already assigned to the current thread.
This solution gives our batch processing environments the ability to scale well beyond the solution offered by
ActiveRecord
find_in_batches
, orfind_each
.In particular the use of
each_hash
avoids the overhead of creating an instance of the model which also helps with performance where applicable.