Last active
October 6, 2023 18:58
-
-
Save ei-grad/3a1709a53592f2566479350eaeba0496 to your computer and use it in GitHub Desktop.
Generator function that efficiently transforms an iterator yielding data chunks of arbitrary sizes into an iterator yielding chunks of a specified exact size, except possibly for the last chunk.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import TypeVar, Iterable, Generator, Callable, cast, Sequence | |
T = TypeVar('T', bound=Sequence) | |
def exact_size_chunks_iter( | |
chunks_iter: Iterable[T], | |
chunk_size: int, | |
concat: Callable[[Iterable[T]], T] = cast(Callable[[Iterable[T]], T], b''.join), | |
) -> Generator[T, None, None]: | |
""" | |
Generator function that efficiently transforms an iterator yielding data | |
chunks of arbitrary sizes into an iterator yielding chunks of a specified | |
exact size, except possibly for the last chunk. | |
Using an optimal buffering algorithm this function generates new objects | |
only when forming and yielding the chunks - concatenating original chunks | |
in a single operation, or yielding their slices. | |
Args: | |
chunks_iter (Iterable[T]): An iterator that yields chunks of data of | |
arbitrary sizes. | |
chunk_size (int): The exact size for the chunks to be yielded. The last | |
chunk may be smaller if the total size of data is not | |
a multiple of chunk_size. | |
concat (function): A function that concatenates the chunks of data. | |
Default is bytes concatenation. The function should | |
take an iterable of chunks and return a single chunk. | |
Yields: | |
T: Chunks of data of size `chunk_size`, except possibly for the last | |
chunk. | |
Examples: | |
Upload large files to Amazon S3 in fixed-size chunks (similar to | |
s3_client.upload_fileobj() with response.raw, which can not be used in | |
some scenarios): | |
import boto3 | |
import requests | |
response = requests.get("http://example.com/largefile", stream=True) | |
response.raise_for_status() | |
s3 = boto3.resource('s3') | |
s3_object = s3.Bucket("mybucket").Object("myobject") | |
mpu = s3_object.initiate_multipart_upload() | |
parts = [] | |
try: | |
for i, chunk in enumerate( | |
exact_size_chunks_iter( | |
response.iter_content(), | |
chunk_size=32 * (2 ** 20) # 32MB | |
), | |
start=1, | |
): | |
part = mpu.Part(i).upload(Body=chunk) | |
parts.append({"PartNumber": i, "ETag": part["ETag"]}) | |
mpu.complete(MultipartUpload={"Parts": parts}) | |
except Exception as e: | |
mpu.abort() | |
raise e | |
Retransmit HTTP response over UDP with exact payload size: | |
import socket | |
import requests | |
url = "http://example.com/live_video_feed" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.connect(("127.0.0.1", 12345)) | |
for chunk in exact_size_chunks_iter( | |
requests.get(url, stream=True).iter_content(), | |
chunk_size=512 | |
): | |
sock.send(chunk) | |
Working with unbalanced Parquet datasets (ex. AWS VPC Flowlogs, which | |
could have tons of files ranging from 1 record per file to millions of | |
records in a single row group), this function can be used to compact | |
the dataset into a single file with a row groups of a specified size: | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
def concat_batches(batches): | |
t = pa.Table.from_batches(batches) | |
t = t.combine_chunks() | |
b = t.to_batches() | |
assert len(b) == 1 | |
return b[0] | |
def batch_iterator(fragments): | |
for i in fragments: | |
yield from i.to_batches() | |
ds = pq.ParquetDataset(input_path) | |
with pq.ParquetWriter(output_filename, schema=ds.schema) as f: | |
for batch in exact_size_chunks_iter( | |
batch_iterator(ds.fragments), 128000, | |
concat=concat_batches | |
): | |
f.write_batch(batch) | |
""" | |
buf: list[T] = [] | |
missing_bytes = chunk_size | |
for i in chunks_iter: | |
if len(i) >= missing_bytes: | |
# first, send what is already in buffer | |
if buf: | |
buf.append(cast(T, i[:missing_bytes])) | |
yield concat(buf) | |
buf.clear() | |
# or just send the first chunk from `i` | |
else: | |
yield cast(T, i[:missing_bytes]) | |
# now iterate over extra full chunks from `i` | |
start = missing_bytes | |
end = start + chunk_size | |
while end <= len(i): | |
yield cast(T, i[start:end]) | |
start, end = end, end + chunk_size | |
# finally, store the remainder in buffer | |
if start < len(i): | |
buf.append(cast(T, i[start:])) | |
# and update the missing bytes counter | |
missing_bytes = end - len(i) | |
elif len(i) > 0: | |
# chunk is smaller than missing bytes, just store it in buffer | |
buf.append(i) | |
missing_bytes -= len(i) | |
# don't forget to yield the remainder | |
if len(buf) == 1: | |
yield buf[0] | |
elif len(buf) > 1: | |
yield concat(buf) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import random | |
from exact_size_chunks_iter import exact_size_chunks_iter | |
def test_exact_size_chunks_iter_random(): | |
buf = [ | |
os.urandom(random.randint(1, 99)) | |
for i in range(50) | |
] | |
buf_s = b''.join(buf) | |
for chunk_size in [1, 10, 100, 1000, 10000]: | |
ret = list(exact_size_chunks_iter(buf, chunk_size)) | |
for i in ret[:-1]: | |
assert len(i) == chunk_size | |
assert len(ret[-1]) <= chunk_size | |
assert b''.join(ret) == buf_s | |
def test_exact_size_chunks_iter_fixed(): | |
buf = [ | |
os.urandom(100) | |
for i in range(30) | |
] | |
buf.append(os.urandom(10)) | |
buf_s = b''.join(buf) | |
for chunk_size in [1, 10, 100, 1000, 10000]: | |
ret = list(exact_size_chunks_iter(buf, chunk_size)) | |
for i in ret[:-1]: | |
assert len(i) == chunk_size | |
assert len(ret[-1]) <= chunk_size | |
assert b''.join(ret) == buf_s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What about playing with
itertools
a little?