Created
April 30, 2012 23:00
-
-
Save iandanforth/2563472 to your computer and use it in GitHub Desktop.
Boto DynamoDB batch_write and batch_get wrappers. Overcomes 25 and 100 item limits respectively.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def unlimitedBatchRead(logger, db, keys): | |
''' | |
Takes a list of N keys and breaks it into requests of 100 keys or less. | |
This is the max per call key count as imposed by Amazon. | |
* db - A boto Table object | |
* keys - A list of item hash keys | |
''' | |
conn = db.layer2 | |
finalItems = [] | |
prevKeysLen = len(keys) | |
while keys: | |
bl = conn.new_batch_list() | |
# Add in our batch of keys | |
bl.add_batch(db, keys = keys[:100]) | |
# Make the request | |
rv = conn.batch_get_item(bl) | |
pItems = rv['Responses'][db.name]['Items'] | |
finalItems.extend(pItems) | |
# Remove all keys we got from the list of keys | |
hash_key = db.schema.hash_key_name | |
for item in pItems: | |
keys.remove(item[hash_key]) | |
keysLen = len(keys) | |
# If the db never returns an item for a key ,it doesn't have it. | |
# In that case we'll make two requests without the number of keys | |
# diminishing. Also we need to make sure this wasn't just a serious | |
# throttling situation by checking for unprocessed keys | |
if keysLen == prevKeysLen and not rv['UnprocessedKeys'][db.name]: | |
break | |
prevKeysLen = keysLen | |
return finalItems | |
def unlimitedBatchWrite(logger, db, items): | |
''' | |
Takes a list of N items and breaks it into requests of 25 items or less. | |
This is the max per call item count as imposed by Amazon. This is a | |
recursive method that will handle the case where some puts are returned | |
as unprocessed. | |
* db - A boto Table object | |
* items - A list of boto Item objects | |
''' | |
conn = db.layer2 | |
step = 25 | |
index = 0 | |
while index <= len(items): | |
bwl = conn.new_batch_write_list() | |
subset = items[index:(index+step)] | |
bwl.add_batch(db, puts = subset) | |
# Send this batch | |
rv = conn.batch_write_item(bwl) | |
unpItems = rv['UnprocessedItems'] | |
if unpItems: | |
logger.warn('Some items we sent were not processed! We will recurse ' | |
'until they all are.') | |
for table, things in unpItems.iteritems(): | |
tryAgainItems = [thing['PutRequest']['Item'] for thing in things] | |
# Recurse | |
unlimitedBatchWrite(logger, db, tryAgainItems) | |
index += step |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I think that in line 48, it should be "<", not "<="