Skip to content

Instantly share code, notes, and snippets.

@tpapp
Last active September 28, 2017 10:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tpapp/66f053bcab3f763a72318a9e8ef6e177 to your computer and use it in GitHub Desktop.
Save tpapp/66f053bcab3f763a72318a9e8ef6e177 to your computer and use it in GitHub Desktop.
reading UInt8 lines from a gzip compressed file in Julia
######################################################################
# context: reading UInt8 lines from a gzipped stream
# - the real dataset has about 5e10 lines, this is a self-contaned MWE
# - the real dataset lines are then processed, this MEW is just about optimizing reading
# - line length can be bounded (relevant for buffered reading)
######################################################################
using CodecZlib
"""
dolines(f, io, [maxlines])
Call `f` on `io` repeatedly. `f` is supposed to return a `Vector{UInt8}`.
When `maxlines` > 0, read that many lines, otherwise read them all.
Return the total number of characters read, but this is just so that the value
from `f` is used somehow and not optimized away.
"""
function dolines(f, io, maxlines = -1)
count = 0
sumtotal = 0
while !eof(io) && (maxlines < 0 || count < maxlines)
line = f(io)
sumtotal += length(line)
count += 1
count % 10_000_000 == 0 && print(".")
end
sumtotal
end
"Just use the string from readline."
readline1(io) = convert(Vector{UInt8}, readline(io, chomp = false))
"Use readuntil directly."
readline2(io) = readuntil(io, 0x0a)
"""
Buffered read.
"""
mutable struct BufferedLine
"buffer"
buffer::Vector{UInt8}
"position of the last element in the buffer that was used up"
used::Int
"number of bytes in the buffer"
len::Int
BufferedLine(buffer_size::Int) = new(Vector{UInt8}(buffer_size), 0, 0)
end
"""
Read into a buffer, look for `\n`, return a view to the buffer.
When line is too long for the buffer, throw an error.
"""
function (bl::BufferedLine)(io::IO)
if bl.used > 0
@inbounds bl.buffer[1:(bl.len-bl.used)] .= bl.buffer[(bl.used+1):bl.len]
bl.len -= bl.used
bl.used = 0
end
nb = readbytes!(io, @view(bl.buffer[(bl.len+1):end]), length(bl.buffer)-bl.len)
bl.len += nb
bl.used = findfirst(bl.buffer, 0x0a)
bl.used == 0 && error("line longer than buffer size")
@view bl.buffer[1:bl.used]
end
######################################################################
# runtime
######################################################################
# make a file to read - for the MWE
function create_file(filename, N)
@assert !isfile(filename) "$filename already exists, please delete manually"
io = GzipCompressionStream(open(filename, "w"))
for _ in 1:N
println(io, "$(rand(Int128));rand(Int128);")
end
close(io)
end
filename = tempname()
create_file(filename, 10_000_000) # will take a while
# benchmark reading
io = GzipDecompressionStream(open(filename))
bl = BufferedLine(200)
@time dolines(readline1, io, 1) # compile
@time dolines(readline2, io, 1) # compile
@time dolines(bl, io, 1) # compile
# 0.537582 seconds (5.01 M allocations: 381.854 MiB, 10.55% gc time)
@time dolines(readline1, io, 1_000_000)
# 0.409361 seconds (2.01 M allocations: 152.972 MiB, 4.84% gc time)
@time dolines(readline2, io, 1_000_000)
# 0.584977 seconds (3.01 M allocations: 320.585 MiB, 6.90% gc time)
@time dolines(bl, io, 1_000_000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment