Skip to content

Instantly share code, notes, and snippets.

@Bajena
Created January 29, 2020 06:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bajena/b9aa2e8582477626a9ae677d0ba629e7 to your computer and use it in GitHub Desktop.
Save Bajena/b9aa2e8582477626a9ae677d0ba629e7 to your computer and use it in GitHub Desktop.
# frozen_string_literal: true
require "net/ftp"
class Loader
def load
Enumerator.new { |main_enum| stream(main_enum) }
end
private
attr_reader :ftp, :inflater
def stream(main_enum)
init_ftp_connection
init_gzip_inflater
# Drop the header line
split_lines.lazy.drop(1).each { |line| main_enum << preprocess_row(line) }
ensure
ftp&.close
inflater&.close
end
def split_lines
buffer = ""
Enumerator.new do |yielder|
ungzip.each do |decompressed_chunk|
buffer += decompressed_chunk
new_buffer = ""
buffer.each_line do |l|
l.ends_with?("\n") ? yielder << l : new_buffer += l
end
buffer = new_buffer
end
end
end
def ungzip
Enumerator.new do |yielder|
stream_file_from_ftp.each do |compressed|
inflater.inflate(compressed) do |decompressed_chunk|
yielder << decompressed_chunk
end
end
end
end
def stream_file_from_ftp
chunk_size = 1024
Enumerator.new do |ftp_stream_enum|
ftp.getbinaryfile("file.csv.gz", nil, chunk_size) do |chunk|
ftp_stream_enum << chunk
end
end
end
def init_ftp_connection
@ftp = Net::FTP.new(
"ftp://host.com",
"user",
"pass"
).tap { |f| f.passive = true }
end
def init_gzip_inflater
# Taken from examples in:
# https://docs.ruby-lang.org/en/2.0.0/Zlib/Inflate.html
@inflater = Zlib::Inflate.new(Zlib::MAX_WBITS + 32)
end
def preprocess_row(row)
row.chomp.gsub('"', "").split(",")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment