Skip to content

Instantly share code, notes, and snippets.

@jmehnle
Last active June 4, 2020 21:23
Show Gist options
  • Save jmehnle/fbcbb450641d7582841bc765de291ffc to your computer and use it in GitHub Desktop.
Save jmehnle/fbcbb450641d7582841bc765de291ffc to your computer and use it in GitHub Desktop.
import gzip
import io
from typing import Iterator
class BytesReader:
def __init__(self, bytes_generator: Iterator) -> None:
self.bytes_generator = bytes_generator
self.buffer: bytes = b""
def read(self,
size: int = 1024 ** 2
) -> bytes:
# Accumulate enough data, as long as bytes generator has any:
try:
while len(self.buffer) < size:
chunk = next(self.bytes_generator)
self.buffer += chunk
except StopIteration:
# Bytes generator is empty.
pass
# Return data, but no more than requested:
result, self.buffer = self.buffer[:size], self.buffer[size:]
return result
class GzippingBytesReader:
class ByteArrayWriter(bytearray):
def write(self, data: bytes) -> None:
self.extend(data)
def flush(self) -> None:
pass
def __init__(self, filename: Optional[str] = None, bytes_generator: Iterator) -> None:
self.bytes_generator = bytes_generator
self.buffer = ByteArrayWriter()
self.compressor = gzip.GzipFile(filename=filename, mode="wb", fileobj=self.buffer)
self.compressor.flush() # Flush gzip header.
def read(self,
size: int = 1024 ** 2
) -> bytes:
# Accumulate enough data, as long as bytes generator has any:
try:
while len(self.buffer) < size:
chunk = next(self.bytes_generator)
self.compressor.write(chunk)
self.compressor.flush()
except StopIteration:
# Bytes generator is empty.
if not self.compressor.closed:
self.compressor.close()
# Return data, but no more than requested:
result = bytes(self.buffer[:size])
self.buffer[:size] = b""
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment