Skip to content

Instantly share code, notes, and snippets.

@ei-grad
Last active October 6, 2023 18:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ei-grad/3a1709a53592f2566479350eaeba0496 to your computer and use it in GitHub Desktop.
Save ei-grad/3a1709a53592f2566479350eaeba0496 to your computer and use it in GitHub Desktop.
Generator function that efficiently transforms an iterator yielding data chunks of arbitrary sizes into an iterator yielding chunks of a specified exact size, except possibly for the last chunk.
from typing import TypeVar, Iterable, Generator, Callable, cast, Sequence
T = TypeVar('T', bound=Sequence)
def exact_size_chunks_iter(
chunks_iter: Iterable[T],
chunk_size: int,
concat: Callable[[Iterable[T]], T] = cast(Callable[[Iterable[T]], T], b''.join),
) -> Generator[T, None, None]:
"""
Generator function that efficiently transforms an iterator yielding data
chunks of arbitrary sizes into an iterator yielding chunks of a specified
exact size, except possibly for the last chunk.
Using an optimal buffering algorithm this function generates new objects
only when forming and yielding the chunks - concatenating original chunks
in a single operation, or yielding their slices.
Args:
chunks_iter (Iterable[T]): An iterator that yields chunks of data of
arbitrary sizes.
chunk_size (int): The exact size for the chunks to be yielded. The last
chunk may be smaller if the total size of data is not
a multiple of chunk_size.
concat (function): A function that concatenates the chunks of data.
Default is bytes concatenation. The function should
take an iterable of chunks and return a single chunk.
Yields:
T: Chunks of data of size `chunk_size`, except possibly for the last
chunk.
Examples:
Upload large files to Amazon S3 in fixed-size chunks (similar to
s3_client.upload_fileobj() with response.raw, which can not be used in
some scenarios):
import boto3
import requests
response = requests.get("http://example.com/largefile", stream=True)
response.raise_for_status()
s3 = boto3.resource('s3')
s3_object = s3.Bucket("mybucket").Object("myobject")
mpu = s3_object.initiate_multipart_upload()
parts = []
try:
for i, chunk in enumerate(
exact_size_chunks_iter(
response.iter_content(),
chunk_size=32 * (2 ** 20) # 32MB
),
start=1,
):
part = mpu.Part(i).upload(Body=chunk)
parts.append({"PartNumber": i, "ETag": part["ETag"]})
mpu.complete(MultipartUpload={"Parts": parts})
except Exception as e:
mpu.abort()
raise e
Retransmit HTTP response over UDP with exact payload size:
import socket
import requests
url = "http://example.com/live_video_feed"
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.connect(("127.0.0.1", 12345))
for chunk in exact_size_chunks_iter(
requests.get(url, stream=True).iter_content(),
chunk_size=512
):
sock.send(chunk)
Working with unbalanced Parquet datasets (ex. AWS VPC Flowlogs, which
could have tons of files ranging from 1 record per file to millions of
records in a single row group), this function can be used to compact
the dataset into a single file with a row groups of a specified size:
import pyarrow as pa
import pyarrow.parquet as pq
def concat_batches(batches):
t = pa.Table.from_batches(batches)
t = t.combine_chunks()
b = t.to_batches()
assert len(b) == 1
return b[0]
def batch_iterator(fragments):
for i in fragments:
yield from i.to_batches()
ds = pq.ParquetDataset(input_path)
with pq.ParquetWriter(output_filename, schema=ds.schema) as f:
for batch in exact_size_chunks_iter(
batch_iterator(ds.fragments), 128000,
concat=concat_batches
):
f.write_batch(batch)
"""
buf: list[T] = []
missing_bytes = chunk_size
for i in chunks_iter:
if len(i) >= missing_bytes:
# first, send what is already in buffer
if buf:
buf.append(cast(T, i[:missing_bytes]))
yield concat(buf)
buf.clear()
# or just send the first chunk from `i`
else:
yield cast(T, i[:missing_bytes])
# now iterate over extra full chunks from `i`
start = missing_bytes
end = start + chunk_size
while end <= len(i):
yield cast(T, i[start:end])
start, end = end, end + chunk_size
# finally, store the remainder in buffer
if start < len(i):
buf.append(cast(T, i[start:]))
# and update the missing bytes counter
missing_bytes = end - len(i)
elif len(i) > 0:
# chunk is smaller than missing bytes, just store it in buffer
buf.append(i)
missing_bytes -= len(i)
# don't forget to yield the remainder
if len(buf) == 1:
yield buf[0]
elif len(buf) > 1:
yield concat(buf)
import os
import random
from exact_size_chunks_iter import exact_size_chunks_iter
def test_exact_size_chunks_iter_random():
buf = [
os.urandom(random.randint(1, 99))
for i in range(50)
]
buf_s = b''.join(buf)
for chunk_size in [1, 10, 100, 1000, 10000]:
ret = list(exact_size_chunks_iter(buf, chunk_size))
for i in ret[:-1]:
assert len(i) == chunk_size
assert len(ret[-1]) <= chunk_size
assert b''.join(ret) == buf_s
def test_exact_size_chunks_iter_fixed():
buf = [
os.urandom(100)
for i in range(30)
]
buf.append(os.urandom(10))
buf_s = b''.join(buf)
for chunk_size in [1, 10, 100, 1000, 10000]:
ret = list(exact_size_chunks_iter(buf, chunk_size))
for i in ret[:-1]:
assert len(i) == chunk_size
assert len(ret[-1]) <= chunk_size
assert b''.join(ret) == buf_s
@daskol
Copy link

daskol commented Oct 6, 2023

What about playing with itertools a little?

from typing import TypeVar, Iterable, Generator, Callable, cast, Sequence

from itertools import chain
from sys import version_info

if version_info >= (3, 12):
    from itertools import batched
else:
    from itertools import islice

    def batched(iterable, n):
        # batched('ABCDEFG', 3) --> ABC DEF G
        if n < 1:
            raise ValueError('n must be at least one')
        it = iter(iterable)
        while batch := tuple(islice(it, n)):
            yield batch

T = TypeVar('T', bound=Sequence)

DEFAULT_CONCAT = cast(Callable[[Iterable[T]], T], bytes)


def exact_size_chunks_iter(
    chunks_iter: Iterable[T],
    chunk_size: int,
    concat: Callable[[Iterable[T]], T] = DEFAULT_CONCAT,
) -> Generator[T, None, None]:
    flatted = chain.from_iterable(chunks_iter)
    batches = batched(flatted, n=chunk_size)
    yield from (concat(b) for b in batches)

@ei-grad
Copy link
Author

ei-grad commented Oct 6, 2023

Worse asymptotic complexity.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment