Skip to content

Instantly share code, notes, and snippets.

@gotmayonase
Last active June 16, 2017 03:57
Show Gist options
  • Save gotmayonase/8a377b9692a2636df1da75dff324c395 to your computer and use it in GitHub Desktop.
Save gotmayonase/8a377b9692a2636df1da75dff324c395 to your computer and use it in GitHub Desktop.
Process a CSV file in parallel using threads
require 'fileutils'
require 'csv'
class ParallelCSV
attr_accessor :threads, :header_line, :opts
def initialize(file_path, opts = {})
@threads = opts.delete(:threads) || 2
headers = opts.delete(:headers)
if headers
@header_line = `head -1 #{file_path}`
end
@opts = opts
end
class << self
def for_each(file_path, opts = {}, &block)
processor = new(file_path, opts)
line_count = `wc -l #{file_path}`.strip.split(' ')[0].to_i
puts "Line count: #{line_count}"
per_thread = (line_count / processor.threads.to_f).ceil
current_line = @header_line ? 2 : 1
@threads = []
while current_line < line_count
@threads << Thread.new(current_line, per_thread) do |line, inc_amount|
csv_string = `sed -n #{line},#{line + inc_amount}p #{file_path}`.strip
CSV.parse(csv_string, headers: processor.header_line || false) do |row|
block.call(row)
end
puts "Finished processing lines #{line} - #{line + inc_amount}"
end
current_line += per_thread
end
@threads.each { |thread| thread.join }
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment