Skip to content

Instantly share code, notes, and snippets.

@jonathanslenders
Created June 19, 2022 15:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jonathanslenders/59ddf8fe2a0954c7f1865fba3b151868 to your computer and use it in GitHub Desktop.
Save jonathanslenders/59ddf8fe2a0954c7f1865fba3b151868 to your computer and use it in GitHub Desktop.
download_and_index.py
# - Short snippet extracted from a commercial Cisco project, to justif
# adding a `str.splitlines(return_indexes=True)` feature to Python. -
async def _start_download_and_index(self) -> None:
"""
Download the file content (as a binary stream), index the lines
(keep the line index in memory), and store the file content
locally on disk. This needs to be as efficient as possible.
We're dealing with huge (>1GB) files, but users typically view
only a small portion of the file.
"""
run_in_executor = asyncio.get_event_loop().run_in_executor
line_positions = self.line_positions # (empty list at this point.)
line_positions.append(0) # Add start of very first line.
byte_position = 0
# NOTE: We disable buffering in the file that we write, because the
# file can be opened for reading elsewhere (to get the header, or
# specific lines). I we don't do this, we have to call at least
# `flush` before setting 'bytes_available', and after updating
# line positions.
# Download file to local storage.
def write_chunk() -> None:
chunk_len = len(chunk)
if chunk.count(b"\0") == chunk_len:
# If the input contains only zeros, don't actually
# write the file to disk, but seek to the immediate
# position. This way, we create a sparse file if the underlying
# filesystem supports it. This is much quiker, and we don't
# consume any actual disk space. We had such a situation with a
# lastlog file:
# https://en.wikipedia.org/wiki/Lastlog
f.seek(f.tell() + chunk_len)
f.truncate()
else:
f.write(chunk)
with open(self.local_path, "wb", buffering=0) as f:
async with aclosing(self._safe_download_file()) as iterator:
async for chunk in iterator:
# Write async, if the disk I/O becomes saturated, we still
# want to keep the event loop as responsive as possible, so
# that incoming network requests gets handled quickly.
# (This is similar to what aiofiles would do.)
await run_in_executor(None, write_chunk)
# Search for line endings.
# (This is the most efficient implementation I found so
# far. `splitlines(True)` will split using univerval line
# endings, supporting \r, \n and \r\n, and it will keep the
# endings, so that we can map `len` to it.)
# NOTE: In case of utf-16, splitlines doesn't work
# correctly. It doesn't incorporate the \0 byte
# following \n into the line ending. This means that
# if we store the offsets like this, the next line
# will start with one or more zero bytes.
# >>> 'hello\nworld\n'.encode('utf-16') \
# .splitlines()[1].decode('utf-16', 'ignore')
# '眀漀爀氀搀'
length_iterator = accumulate(
chain([byte_position], map(len, chunk.splitlines(True)))
)
next(length_iterator) # Skip first item.
line_positions.extend(length_iterator)
# Only keep the last index if the chunk actually ends on a
# line ending.
if chunk and not chunk.endswith((b"\r", b"\n")):
line_positions.pop()
# Update byte position.
byte_position += len(chunk)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment