Skip to content

Instantly share code, notes, and snippets.

@zulrang
Last active January 8, 2024 23:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zulrang/d07c64b4f445c92351b4067544aa5346 to your computer and use it in GitHub Desktop.
Save zulrang/d07c64b4f445c92351b4067544aa5346 to your computer and use it in GitHub Desktop.
Async DynamoDB Batch Writer using aioboto3 (typed)
import aioboto3
from types_aiobotocore_dynamodb.service_resource import Table
class DynamoDBBatchWriter:
"""
A class that batches up to 25 writes to DynamoDB. Flushes automatically when the batch reaches 25 items,
5 seconds pass, or when the `flush()` method is called.
This class is not thread-safe.
"""
def __init__(self, table: Table):
self.table = table
self._batch = []
self._futures = set()
self._running = True
# flush periodically
self._periodic_task = asyncio.create_task(self._flush_periodically())
async def _flush_periodically(self):
while self._running:
await asyncio.sleep(5)
print(f"Writer status: {len(self._batch)} items in batch, {len(self._futures)} futures")
await self._flush()
async def put_item(self, item: dict):
self._batch.append(item)
if len(self._batch) >= 25:
await self._flush()
def _remove_done(self, task: asyncio.Task):
self._futures.remove(task)
async def _flush(self):
async with self.table.batch_writer() as batch_writer:
for item in self._batch:
task = asyncio.create_task(batch_writer.put_item(Item=item))
task.add_done_callback(self._remove_done)
self._futures.add(task)
self._batch = []
async def flush(self):
await self._flush()
await asyncio.gather(*self._futures)
self._futures = []
async def stop(self):
self._running = False
await self.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment