Skip to content

Instantly share code, notes, and snippets.

@ismasan
Last active February 24, 2023 23:01
Show Gist options
  • Save ismasan/44d8b3f49387681f1228a79d3aada9f1 to your computer and use it in GitHub Desktop.
Save ismasan/44d8b3f49387681f1228a79d3aada9f1 to your computer and use it in GitHub Desktop.
# Turn a line-based Enumerator into an Enumerator
# that parses each CSV line and yields them as Hash rows
# Usage
# csv = CSVStream.new(line_stream, headers: { product_name: 0, price: 2 })
#
# With a bit more work it could detect CSV headers automatically, from the first line.
require 'csv'
class CSVStream
include Enumerable
delegate :each, :to_a, to: :to_enum
# @param enum [Enumerator] that yields lines of text
# @option headers [Hash<Symbol, Integer>] keys are column names, values are indices in the CSV lines where the columns are expected
def initialize(enum, headers:)
@enum = enum.to_enum
@headers = headers
end
def to_enum
Enumerator.new do |y|
@enum.each { |line| y << row(CSV.parse_line(line)) }
end
end
private def row(list)
@headers.each.with_object({}) { |(k,idx), m| m[k] = list[idx] }
end
end
stream = S3Stream.build(bucket: 'stuff', key: 'bigfile.csv')
.then { |chunks| LineStream.new(chunks) }
.then { |lines| CSVStream.new(lines, headers: { product_name: 0, price: 2 }) }
stream.take(100).each do |row|
puts row[:price].inspect
end
# Turn a chunks-based Enumerator into a line-based one
# ie. it buffers chunks and yields each "\n" delimited line as they come.
# Usage
# lines = LineStream.new(s3_stream)
# lines.each { |line| ... }
require 'strscan'
class LineStream
include Enumerable
delegate :each, :to_a, to: :to_enum
def initialize(chunks, separator: $/)
@chunks = chunks.to_enum
@separator = separator.is_a?(String) ? %r{#{separator}} : separator
end
def to_enum
Enumerator.new do |y|
buffer = StringScanner.new(''.dup)
@chunks.lazy.each do |chunk|
buffer << chunk
bytes = 0
while line = buffer.scan_until(@separator)
bytes += line.bytesize
y << line.chomp
end
buffer.string = buffer.string.byteslice(bytes..-1) if bytes > 0
end
y << buffer.rest
end
end
end
# Paginate chunked requests to an S3 object,
# and present it as an Enumerator that yields each chunk
# Usage
# s3_stream = S3Stream.build(bucket: 'stuff', key: 'bigfile.csv')
# s3_stream.each { |chunk| ... }
require 'aws-sdk-s3'
class S3Stream
DEFAULT_REGION = 'eu-west-1'
CHUNK_SIZE = 100 * 1024
def self.build(region: DEFAULT_REGION, **kargs)
client = Aws::S3::Client.new(region:)
new(client:, **kargs)
end
def initialize(client:, bucket:, key:, chunk_size: CHUNK_SIZE)
@client = client
@bucket = bucket
@key = key
@chunk_size = chunk_size
end
def to_enum
Enumerator.new do |y|
total = Float::INFINITY
offset = 0
while offset < total
limit = offset + chunk_size
resp = client.get_object(bucket:, key:, range: "bytes=#{offset}-#{limit}")
total = resp.content_range.split('/').last.to_i
offset = limit + 1
y << resp.body.read
end
end
end
private
attr_reader :bucket, :key, :client, :chunk_size, :encoding
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment