Created
June 19, 2022 15:04
-
-
Save jonathanslenders/59ddf8fe2a0954c7f1865fba3b151868 to your computer and use it in GitHub Desktop.
download_and_index.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# - Short snippet extracted from a commercial Cisco project, to justif | |
# adding a `str.splitlines(return_indexes=True)` feature to Python. - | |
async def _start_download_and_index(self) -> None: | |
""" | |
Download the file content (as a binary stream), index the lines | |
(keep the line index in memory), and store the file content | |
locally on disk. This needs to be as efficient as possible. | |
We're dealing with huge (>1GB) files, but users typically view | |
only a small portion of the file. | |
""" | |
run_in_executor = asyncio.get_event_loop().run_in_executor | |
line_positions = self.line_positions # (empty list at this point.) | |
line_positions.append(0) # Add start of very first line. | |
byte_position = 0 | |
# NOTE: We disable buffering in the file that we write, because the | |
# file can be opened for reading elsewhere (to get the header, or | |
# specific lines). I we don't do this, we have to call at least | |
# `flush` before setting 'bytes_available', and after updating | |
# line positions. | |
# Download file to local storage. | |
def write_chunk() -> None: | |
chunk_len = len(chunk) | |
if chunk.count(b"\0") == chunk_len: | |
# If the input contains only zeros, don't actually | |
# write the file to disk, but seek to the immediate | |
# position. This way, we create a sparse file if the underlying | |
# filesystem supports it. This is much quiker, and we don't | |
# consume any actual disk space. We had such a situation with a | |
# lastlog file: | |
# https://en.wikipedia.org/wiki/Lastlog | |
f.seek(f.tell() + chunk_len) | |
f.truncate() | |
else: | |
f.write(chunk) | |
with open(self.local_path, "wb", buffering=0) as f: | |
async with aclosing(self._safe_download_file()) as iterator: | |
async for chunk in iterator: | |
# Write async, if the disk I/O becomes saturated, we still | |
# want to keep the event loop as responsive as possible, so | |
# that incoming network requests gets handled quickly. | |
# (This is similar to what aiofiles would do.) | |
await run_in_executor(None, write_chunk) | |
# Search for line endings. | |
# (This is the most efficient implementation I found so | |
# far. `splitlines(True)` will split using univerval line | |
# endings, supporting \r, \n and \r\n, and it will keep the | |
# endings, so that we can map `len` to it.) | |
# NOTE: In case of utf-16, splitlines doesn't work | |
# correctly. It doesn't incorporate the \0 byte | |
# following \n into the line ending. This means that | |
# if we store the offsets like this, the next line | |
# will start with one or more zero bytes. | |
# >>> 'hello\nworld\n'.encode('utf-16') \ | |
# .splitlines()[1].decode('utf-16', 'ignore') | |
# '眀漀爀氀搀' | |
length_iterator = accumulate( | |
chain([byte_position], map(len, chunk.splitlines(True))) | |
) | |
next(length_iterator) # Skip first item. | |
line_positions.extend(length_iterator) | |
# Only keep the last index if the chunk actually ends on a | |
# line ending. | |
if chunk and not chunk.endswith((b"\r", b"\n")): | |
line_positions.pop() | |
# Update byte position. | |
byte_position += len(chunk) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment