Skip to content

Instantly share code, notes, and snippets.

@ademidun
Last active November 7, 2019 16:33
Show Gist options
  • Save ademidun/74a6301d29dbc1af08ad38a90b35efd3 to your computer and use it in GitHub Desktop.
Save ademidun/74a6301d29dbc1af08ad38a90b35efd3 to your computer and use it in GitHub Desktop.
Some helpful python code snippets when working with dynamodb.
from botocore.exceptions import ClientError
def dict_to_dynamo(data):
# if you try to save an empty string to dynamo you will get the following error:
# 'An error occurred (ValidationException) when calling the BatchWriteItem operation: One or more parameter values were invalid: An AttributeValue may not contain an empty string'
# This function recursively converts all empty string to valid None/Null type values that dynamo accepts
# https://gist.github.com/JamieCressey/a3a75a397db092d7a70bbe876a6fb817
# to go the other way
if isinstance(raw, dict):
for k, v in raw.items():
raw[k] = dict_to_dynamo(v)
elif isinstance(raw, list):
for i in range(len(raw)):
raw[i] = dict_to_dynamo(raw[i])
elif raw == "":
raw = None
return raw
def get_dynamo_items(table_name, table_filter=None, *args, **kwargs):
if not table_filter:
table_filter = {}
RETRY_EXCEPTIONS = ('ProvisionedThroughputExceededException',
'ThrottlingException')
table = dynamodb.Table(table_name)
get_table_items = table.scan
if kwargs.get('table_get_type') == 'query':
get_table_items = table.query
print('get_dynamo_items() get_table_items, table_filter', get_table_items, table_filter)
response = get_table_items(**table_filter)
items = response['Items']
# todo watch for memory overflow in batching all the items as once
# https://gist.github.com/shentonfreude/8d26ca1fc93fdb801b2c
# https://github.com/boto/boto3/issues/597#issuecomment-323982159
retries = 0
max_retry = 3
while 'LastEvaluatedKey' in response and retries < max_retry:
try:
print('items retrieved:', len(items))
# if len(items) > 1000:
# return dynamo_to_dict(items)
response = get_table_items(ExclusiveStartKey=response['LastEvaluatedKey'], **table_filter)
items.extend(response['Items'])
retries = 0
except ClientError as err:
if err.response['Error']['Code'] not in RETRY_EXCEPTIONS:
raise
print('WHOA, too fast, slow it down retries={}, items={}'.format(retries, len(items)), )
sleep(1.3 ** retries)
retries += 1 # TODO max limit
return dynamo_to_dict(items)
# Query for next set of items
def query_some_items(table_name, META_CONFIG, META_KEY_MAP, limit=20):
meta_table_name = "{0}-{1}".format("dev", META_KEY_MAP["META_TABLE"])
last_evaluated = get_last_key(META_CONFIG[META_KEY_MAP["META_ID"]], meta_table_name, META_KEY_MAP)
table = dynamodb.Table(table_name)
# performance improved by querying for the reserved value, no retries, if throughput exceeded will pick up next time
if last_evaluated:
response = table.query(
IndexName=META_CONFIG['keyName'] + '-index',
KeyConditionExpression=Key(META_CONFIG['keyName']).eq(-1),
Limit=limit,
ExclusiveStartKey=last_evaluated,
)
else:
response = table.query(
IndexName=META_CONFIG['keyName'] + '-index',
KeyConditionExpression=Key(META_CONFIG['keyName']).eq(-1),
Limit=limit)
last_evaluated = response.get('LastEvaluatedKey', None)
set_last_key(META_CONFIG, meta_table_name, last_evaluated, META_KEY_MAP)
items_to_clean = response.get('Items')
items_to_clean = dynamo_to_dict(items_to_clean)
return items_to_clean
def get_last_key(config_id, table_name, META_KEY_MAP):
# remove "dev" constant when moving to environment variables
meta_table = dynamodb.Table(table_name)
response = meta_table.get_item(
Key={
META_KEY_MAP["META_ID"]: config_id,
})
config = response.get('Item', None)
if config is None:
return None
last_evaluated_key = config.get(META_KEY_MAP["META_CONFIG_VAL"])
return last_evaluated_key
def set_last_key(last_eval_config, table_name, val, META_KEY_MAP):
# remove "dev" constant when moving to environment variables
meta_table = dynamodb.Table(table_name)
now_seconds_epoch_utc = int(time.time())
config_object = last_eval_config.copy()
config_object[META_KEY_MAP["META_CONFIG_VAL"]] = val
config_object[META_KEY_MAP["TABLE_UPDATED_AT"]] = now_seconds_epoch_utc
meta_table.put_item(Item=config_object)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment