require 'thread' | |
WORKERS = 3 | |
queue = SizedQueue.new(WORKERS * 2) | |
# Backlog some work! Using a separate thread since push operation on | |
# SizedQueue may block when it's "full" | |
Thread.new { MyModel.find_in_batches { |batch| queue << batch } } | |
readio, writeio = IO.pipe | |
# This is the worker pool. Create a fixed number of threads to get work | |
# (ie. an ActiveRelation batch) off the queue and run the CSV transformation | |
exporters = 1.upto(WORKERS).map do | |
Thread.new { | |
until queue.empty? do | |
group = queue.pop | |
group.each { |record| writeio.write MyCSVTransformation.call(record) } | |
end | |
} | |
end | |
# IO "guard" cleans up when all exporter threads have finished their work | |
Thread.new { | |
while exporters.any?(&:alive?) do sleep 0.01 end | |
writeio.close | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment