Instantly share code, notes, and snippets.

@lvonk /Cursor.rb
Last active Oct 31, 2016

Embed
What would you like to do?
Simple postgres cursor
# Use as
Cursor.new(Model.includes(foos: [:bar], :foo_bars).where(id < 1000000)).find_each do |model|
# do something
end
# Inspired by https://github.com/afair/postgresql_cursor
class Cursor
class NoopObserver
def close_failed(e)
end
def fetching(size)
end
end
def initialize(query, fetch_size: 1000, observer: NoopObserver.new)
@query = query
@cursor_name = "my-cursor-#{SecureRandom.uuid}".gsub('-', '')
@stmt_name = "my-stmt-#{SecureRandom.uuid}".gsub('-', '')
@count = 0
@fetch_size = fetch_size
@observer = observer
end
def find_each
ActiveRecord::Base.transaction do
begin
open
while (result = fetch) do
records = result.map { |row| @query.klass.new(row.symbolize_keys) }
break if records.size == 0
load_associations!(records)
records.each do |record|
@count += 1
record.readonly!
yield(record)
end
end
ensure
close
end
end
end
private
def load_associations!(rows)
preload = @query.includes_values
preloader = ActiveRecord::Associations::Preloader.new
preload.each do |associations|
preloader.preload rows, associations
end
end
def open
@conn = ActiveRecord::Base.connection.raw_connection
prepare unless @prepared
@conn.exec_prepared(@stmt_name, (@query.arel.bind_values + @query.bind_values).map { |c, value| c.cast_type.type_cast_for_database(value) })
end
def prepare
@conn.prepare(@stmt_name, "DECLARE #{@cursor_name} CURSOR FOR #{@query.arel.to_sql}")
@prepared = true
end
def fetch
@observer.fetching(@fetch_size)
result = @conn.exec("FETCH #{@fetch_size} IN #{@cursor_name}")
result.collect
end
def close
@conn.exec("CLOSE #{@cursor_name}")
rescue => e
@observer.close_failed(e)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment