Skip to content

Instantly share code, notes, and snippets.

@mhihasan
Last active July 10, 2023 21:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhihasan/4dd0917758d0a3e9a3a58248ed201951 to your computer and use it in GitHub Desktop.
Save mhihasan/4dd0917758d0a3e9a3a58248ed201951 to your computer and use it in GitHub Desktop.
Dynamodb Parallel Scanner
import os
import time
import csv
from concurrent.futures import ThreadPoolExecutor, wait
import boto3
dynamo_client = boto3.resource("dynamodb").meta.client
def _scan_segment(segment_no, total_segments, **kwargs):
print(f"Scanning segment no: {segment_no}")
items = []
while True:
result = dynamo_client.scan(Segment=segment_no, TotalSegments=total_segments, **kwargs)
if result["Items"]:
items.extend(result["Items"])
last_evaluated_key = result.get("LastEvaluatedKey")
if last_evaluated_key is None:
break
kwargs["ExclusiveStartKey"] = last_evaluated_key
print(f"total items in seg: {segment_no}", len(items))
return items
def scan(**kwargs):
total_segments = kwargs.pop("TotalSegments", 10)
results = []
with ThreadPoolExecutor(max_workers=total_segments) as executor:
futures = {
executor.submit(_scan_segment, segment_no=i, total_segments=total_segments, **kwargs)
for i in range(total_segments)
}
done, _ = wait(futures)
for future in done:
res = future.result()
results.extend(res)
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment