Skip to content

Instantly share code, notes, and snippets.

@iandanforth
Created April 30, 2012 23:00
Show Gist options
  • Save iandanforth/2563472 to your computer and use it in GitHub Desktop.
Save iandanforth/2563472 to your computer and use it in GitHub Desktop.
Boto DynamoDB batch_write and batch_get wrappers. Overcomes 25 and 100 item limits respectively.
def unlimitedBatchRead(logger, db, keys):
'''
Takes a list of N keys and breaks it into requests of 100 keys or less.
This is the max per call key count as imposed by Amazon.
* db - A boto Table object
* keys - A list of item hash keys
'''
conn = db.layer2
finalItems = []
prevKeysLen = len(keys)
while keys:
bl = conn.new_batch_list()
# Add in our batch of keys
bl.add_batch(db, keys = keys[:100])
# Make the request
rv = conn.batch_get_item(bl)
pItems = rv['Responses'][db.name]['Items']
finalItems.extend(pItems)
# Remove all keys we got from the list of keys
hash_key = db.schema.hash_key_name
for item in pItems:
keys.remove(item[hash_key])
keysLen = len(keys)
# If the db never returns an item for a key ,it doesn't have it.
# In that case we'll make two requests without the number of keys
# diminishing. Also we need to make sure this wasn't just a serious
# throttling situation by checking for unprocessed keys
if keysLen == prevKeysLen and not rv['UnprocessedKeys'][db.name]:
break
prevKeysLen = keysLen
return finalItems
def unlimitedBatchWrite(logger, db, items):
'''
Takes a list of N items and breaks it into requests of 25 items or less.
This is the max per call item count as imposed by Amazon. This is a
recursive method that will handle the case where some puts are returned
as unprocessed.
* db - A boto Table object
* items - A list of boto Item objects
'''
conn = db.layer2
step = 25
index = 0
while index <= len(items):
bwl = conn.new_batch_write_list()
subset = items[index:(index+step)]
bwl.add_batch(db, puts = subset)
# Send this batch
rv = conn.batch_write_item(bwl)
unpItems = rv['UnprocessedItems']
if unpItems:
logger.warn('Some items we sent were not processed! We will recurse '
'until they all are.')
for table, things in unpItems.iteritems():
tryAgainItems = [thing['PutRequest']['Item'] for thing in things]
# Recurse
unlimitedBatchWrite(logger, db, tryAgainItems)
index += step
@dennistocker
Copy link

I think that in line 48, it should be "<", not "<="

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment