Skip to content

Instantly share code, notes, and snippets.

@edvardm
Created May 15, 2023 10:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save edvardm/bea1cde8d48cb648b4a579154a2a8647 to your computer and use it in GitHub Desktop.
Save edvardm/bea1cde8d48cb648b4a579154a2a8647 to your computer and use it in GitHub Desktop.
add items one at a time, buffering entries and flushing those automatically when there are enough entries
from typing import Any, TypeVar, Callable, Generator
from contextlib import contextmanager
T = TypeVar("T")
class BufferedOp:
"""Class for adding elements to a _buffer and flushing when full.
Note that last batch may is not usually flushed automatically as usually
the last batch is smaller than the _buffer size. Hence it's recommended to use
context manager `buffering()` below which automatically calls `_flush()` in the end
"""
def __init__(self, buffer_size: int, flusher: Callable[[list[T]], Any]):
self.buffer_size = buffer_size
self._buffer: list[T] = []
self._flush = flusher
def add(self, element: T) -> None:
"""Add element to _buffer, automatically flushing _buffer when full."""
self._buffer.append(element)
if len(self._buffer) >= self.buffer_size:
self.flush()
def flush(self) -> None:
"""Flush and reset _buffer"""
self._flush(self._buffer)
self._buffer = []
@contextmanager
def buffering(
flusher: Callable[[list[T]], None], buffer_size: int
) -> Generator[BufferedOp, None, None]:
buf = BufferedOp(buffer_size, flusher)
try:
yield buf
finally:
buf.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment