Last active
November 7, 2019 16:33
-
-
Save ademidun/74a6301d29dbc1af08ad38a90b35efd3 to your computer and use it in GitHub Desktop.
Some helpful python code snippets when working with dynamodb.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from botocore.exceptions import ClientError | |
def dict_to_dynamo(data): | |
# if you try to save an empty string to dynamo you will get the following error: | |
# 'An error occurred (ValidationException) when calling the BatchWriteItem operation: One or more parameter values were invalid: An AttributeValue may not contain an empty string' | |
# This function recursively converts all empty string to valid None/Null type values that dynamo accepts | |
# https://gist.github.com/JamieCressey/a3a75a397db092d7a70bbe876a6fb817 | |
# to go the other way | |
if isinstance(raw, dict): | |
for k, v in raw.items(): | |
raw[k] = dict_to_dynamo(v) | |
elif isinstance(raw, list): | |
for i in range(len(raw)): | |
raw[i] = dict_to_dynamo(raw[i]) | |
elif raw == "": | |
raw = None | |
return raw | |
def get_dynamo_items(table_name, table_filter=None, *args, **kwargs): | |
if not table_filter: | |
table_filter = {} | |
RETRY_EXCEPTIONS = ('ProvisionedThroughputExceededException', | |
'ThrottlingException') | |
table = dynamodb.Table(table_name) | |
get_table_items = table.scan | |
if kwargs.get('table_get_type') == 'query': | |
get_table_items = table.query | |
print('get_dynamo_items() get_table_items, table_filter', get_table_items, table_filter) | |
response = get_table_items(**table_filter) | |
items = response['Items'] | |
# todo watch for memory overflow in batching all the items as once | |
# https://gist.github.com/shentonfreude/8d26ca1fc93fdb801b2c | |
# https://github.com/boto/boto3/issues/597#issuecomment-323982159 | |
retries = 0 | |
max_retry = 3 | |
while 'LastEvaluatedKey' in response and retries < max_retry: | |
try: | |
print('items retrieved:', len(items)) | |
# if len(items) > 1000: | |
# return dynamo_to_dict(items) | |
response = get_table_items(ExclusiveStartKey=response['LastEvaluatedKey'], **table_filter) | |
items.extend(response['Items']) | |
retries = 0 | |
except ClientError as err: | |
if err.response['Error']['Code'] not in RETRY_EXCEPTIONS: | |
raise | |
print('WHOA, too fast, slow it down retries={}, items={}'.format(retries, len(items)), ) | |
sleep(1.3 ** retries) | |
retries += 1 # TODO max limit | |
return dynamo_to_dict(items) | |
# Query for next set of items | |
def query_some_items(table_name, META_CONFIG, META_KEY_MAP, limit=20): | |
meta_table_name = "{0}-{1}".format("dev", META_KEY_MAP["META_TABLE"]) | |
last_evaluated = get_last_key(META_CONFIG[META_KEY_MAP["META_ID"]], meta_table_name, META_KEY_MAP) | |
table = dynamodb.Table(table_name) | |
# performance improved by querying for the reserved value, no retries, if throughput exceeded will pick up next time | |
if last_evaluated: | |
response = table.query( | |
IndexName=META_CONFIG['keyName'] + '-index', | |
KeyConditionExpression=Key(META_CONFIG['keyName']).eq(-1), | |
Limit=limit, | |
ExclusiveStartKey=last_evaluated, | |
) | |
else: | |
response = table.query( | |
IndexName=META_CONFIG['keyName'] + '-index', | |
KeyConditionExpression=Key(META_CONFIG['keyName']).eq(-1), | |
Limit=limit) | |
last_evaluated = response.get('LastEvaluatedKey', None) | |
set_last_key(META_CONFIG, meta_table_name, last_evaluated, META_KEY_MAP) | |
items_to_clean = response.get('Items') | |
items_to_clean = dynamo_to_dict(items_to_clean) | |
return items_to_clean | |
def get_last_key(config_id, table_name, META_KEY_MAP): | |
# remove "dev" constant when moving to environment variables | |
meta_table = dynamodb.Table(table_name) | |
response = meta_table.get_item( | |
Key={ | |
META_KEY_MAP["META_ID"]: config_id, | |
}) | |
config = response.get('Item', None) | |
if config is None: | |
return None | |
last_evaluated_key = config.get(META_KEY_MAP["META_CONFIG_VAL"]) | |
return last_evaluated_key | |
def set_last_key(last_eval_config, table_name, val, META_KEY_MAP): | |
# remove "dev" constant when moving to environment variables | |
meta_table = dynamodb.Table(table_name) | |
now_seconds_epoch_utc = int(time.time()) | |
config_object = last_eval_config.copy() | |
config_object[META_KEY_MAP["META_CONFIG_VAL"]] = val | |
config_object[META_KEY_MAP["TABLE_UPDATED_AT"]] = now_seconds_epoch_utc | |
meta_table.put_item(Item=config_object) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment