Skip to content

Instantly share code, notes, and snippets.

@alebian
Last active September 20, 2018 12:55
Show Gist options
  • Save alebian/aa2b1f926243e178f6ce72b60f172741 to your computer and use it in GitHub Desktop.
Save alebian/aa2b1f926243e178f6ce72b60f172741 to your computer and use it in GitHub Desktop.
class DatabaseRowStream
include Enumerable
BATCH_SIZE = 20_000
def initialize(sql, options = {})
@sql = sql
if options[:pluck]
@pluck = options[:pluck].respond_to?(:join) ? options[:pluck].join(', ') : options[:pluck]
end
@batch_size = options[:batch_size] || BATCH_SIZE
@total_cursor = 0
@batch_cursor = BATCH_SIZE
@buffer = []
end
def each
while (row = read_buffer_or_fetch)
yield row
end
end
def next
read_buffer_or_fetch
end
def next?
retrieve_more_data_if_necessary
!@buffer[@batch_cursor].nil?
end
private
def read_buffer_or_fetch
retrieve_more_data_if_necessary
line = @buffer[@batch_cursor]
return nil unless line
@total_cursor += 1
@batch_cursor += 1
line
end
def retrieve_more_data_if_necessary
return unless @batch_cursor == @batch_size
@buffer = @sql.limit(@batch_size).offset(@total_cursor)
@batch_cursor = 0
@buffer = if @pluck
@buffer.pluck(@pluck)
else
@buffer.select('*')
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment