Created
March 2, 2020 13:22
-
-
Save trilom/f1ea07ab34eeef1690024267cd1c8784 to your computer and use it in GitHub Desktop.
ddb wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
```python | |
import json | |
import logging | |
from datetime import date | |
from datetime import datetime | |
import boto3 | |
from dynamodb_json import json_util as json_ddb | |
logger = logging.getLogger('aws.ddb') | |
logger.warn('Creating aws.ddb object...') | |
def json_serial(obj): | |
logger.debug('in json_serial()...') | |
if isinstance(obj, (datetime, date)): | |
return obj.isoformat() | |
raise TypeError('Type %s not serializable' % type(obj)) | |
# this turns a regular dict into a dynamo dict | |
def dumps(input, **kwargs): | |
""" Dump the dict to json in DynamoDB Format | |
You can use any other simplejson or json options | |
:param dct - the dict to dump | |
:param as_dict - returns the result as python dict (useful for DynamoDB boto3 library) or as json sting | |
:returns: DynamoDB json format. | |
""" | |
return json_ddb.dumps(input, default=json_serial, **kwargs) | |
def loads(input, **kwargs): | |
""" Loads dynamodb json format to a python dict. | |
:param s - the json string or dict (with the as_dict variable set to True) to convert | |
:returns python dict object | |
""" | |
return json_ddb.loads(input, **kwargs) | |
# check ddb dict | |
def checkDDBDict(target, optional=True): | |
if target is None and optional is False: | |
raise Exception({'error': 'DDBDict not defined.'}) | |
elif type(target) == dict: | |
# if dict dump to ddb string then load to object | |
target = json.loads(json_ddb.dumps(target)) | |
elif type(target) != dict: | |
if type(target) == str: | |
# if a string assume its a json string load to object, dump to ddb string, load to object | |
target = json.loads(json_ddb.dumps(json.loads(target))) | |
return target | |
# check ddb dict | |
def checkDict(target, optional=True): | |
if target is None and optional is False: | |
raise Exception({'error': 'Dict not defined.'}) | |
elif type(target) != dict: | |
if type(target) == str: | |
# if a string assume its a json string load to object, dump to ddb string, load to object | |
target = json.loads(target) | |
return target | |
# check string variables | |
def checkString(target, values=None, optional=True): | |
# check values | |
if values is not None: | |
if target not in values and optional is False: | |
raise Exception({'error': f'Target:{target} not in values defined{values}.'}) | |
# check optional | |
if target is None and optional is False: | |
# no target | |
raise Exception({'error': 'String not defined.'}) | |
elif target is None: | |
target = target | |
elif type(target) != str: | |
try: | |
target = str(target) | |
except Exception as exception: | |
raise Exception({'error': f'Cant convert {target} to string {exception}'}) | |
return target | |
# check table variable | |
def checkTable(target): | |
if target is None or len(target) <= 1: | |
# table not defined | |
raise Exception({'error': 'Table not defined.'}) | |
return target | |
def process_paginator(paginatorResponse): | |
data = [] | |
try: | |
for page in paginatorResponse: | |
data.extend(json_ddb.loads(page['Items'])) | |
return json.loads(json.dumps(data, default=json_serial)) | |
except Exception as e: | |
raise Exception({'error': f'Cant process paginator, exception{e}.'}) | |
def build_ue(data): | |
# make ue | |
ue = 'set ' | |
try: | |
for key in data.keys(): | |
ue += f'#{key} = :{key},' | |
# trim last char from ue, trailing comma | |
return ue[:-1] | |
except Exception as e: | |
raise Exception({'error': f'Cant build update expression, exception{e}.'}) | |
def build_ean(data): | |
# make ean | |
ean = {} | |
try: | |
for key in data.keys(): | |
ean[f'#{key}'] = f'{key}' | |
return ean | |
except Exception as e: | |
raise Exception({'error': f'Cant build expression attribute names, exception{e}.'}) | |
def build_eav(data): | |
# make eav | |
eav = {} | |
try: | |
for key in data.keys(): | |
eav[f':{key}'] = f'{data[key]}' | |
return eav | |
except Exception as e: | |
raise Exception({'error': f'Cant build expression attribute values, exception{e}.'}) | |
class Dynamo(): | |
def __init__(self): | |
self.logging = logging.getLogger('aws.ddb.Dynamo') | |
self.logging.warn('Creating aws.ddb.Dynamo object...') | |
self.client = self.connect_ddb() | |
def connect_ddb(self): | |
try: | |
client = boto3.client('dynamodb') | |
return client | |
except Exception as e: | |
error = f'Cannot connect to AWS DDB, exception:{e}' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
# table - string - table to update item | |
# key - dict - dict of key you want to update { 'event_id': 'some_event_id' } | |
# ue - string - expression of values you want to alter 'set verified = :verified' | |
# ean - dict - dict of values that might be protected { '#mode': 'mode' } | |
# eav - dict(or DDB dict) - dict of values to replace for filter expression{ ':mode': 'mode' } | |
# return_values - string ALL_OLD | UPDATED_OLD | ALL_NEW | UPDATED_NEW | |
# | |
# returns ddbResponse object, if you have return values they will be here | |
# TODO: HANDLE MORE OPTIONS FOR UPDATE_ITEM | |
def update_item(self, table, key, ean=None, eav=None, ue=None, data=None, return_values=None): | |
return_value_options = ['ALL_OLD', 'UPDATED_OLD', 'ALL_NEW', 'UPDATED_NEW'] | |
try: | |
if data is not None: | |
ue = (build_ue(data) if ue is None else ue) | |
ean = (build_ean(data) if ean is None else ean) | |
eav = (build_eav(data) if eav is None else eav) | |
payload = { | |
'TableName': checkTable(table), | |
'Key': checkDDBDict(key, optional=False), | |
'UpdateExpression': checkString(ue), | |
'ExpressionAttributeNames': checkDict(ean), | |
'ExpressionAttributeValues': checkDDBDict(eav), | |
'ReturnValues': checkString(return_values, values=return_value_options), | |
} | |
# remove any keys that are none (optional) | |
for key in list(payload.keys()): | |
if payload[key] is None: | |
payload.pop(key) | |
try: | |
ddbResponse = self.client.update_item(**payload) | |
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200: | |
self.logging.debug(f'Updated item in {table}') | |
return ddbResponse | |
else: | |
self.logging.error(f'Issue updating in {table}.') | |
return ddbResponse | |
except Exception as e: | |
error = f'Error updating item in table: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
except Exception as e: | |
error = f'Validation error: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
# | |
# table - string - table to put into | |
# item - string or dict - json object | |
# https://pypi.org/project/dynamodb-json/ | |
# | |
# returns ddbResponse object, if you have return values they will be here | |
# TODO: HANDLE MORE OPTIONS FOR PUT_ITEM | |
def put_item(self, table, item): | |
try: | |
# item not optional | |
payload = { | |
'TableName': checkTable(table), | |
'Item': checkDDBDict(item, False), | |
} | |
# remove any keys that are none (optional) | |
# for key in payload.keys(): | |
# if payload[key] is None: | |
# payload.pop(key) | |
try: | |
ddbResponse = self.client.put_item(**payload) | |
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200: | |
self.logging.debug(f'Added to {table}') | |
return ddbResponse | |
else: | |
self.logging.error(f'Issue adding to {table}.') | |
return ddbResponse | |
except Exception as e: | |
error = f'Error putting in table: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
except Exception as e: | |
error = f'Validation error: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
# table - string - table name | |
# pe - string - valuse you want to return from table | |
# fe - string - expression of values you want to filter from scan results (scan pulls all then filters) | |
# ean - dict - dict of values that might be protected { '#mode': 'mode' } | |
# eav - dict(or DDB dict) - dict of values to replace for filter expression{ ':mode': 'mode' } | |
# | |
# returns list of items | |
# TODO: HANDLE MORE OPTIONS FOR SCAN_TABLE | |
def scan_table(self, table, pe=None, fe=None, eav=None, ean=None, consistent=False): | |
try: | |
payload = { | |
'TableName': checkTable(table), | |
'ProjectionExpression': checkString(pe), | |
'FilterExpression': checkString(fe), | |
'ExpressionAttributeNames': checkDict(ean), | |
'ExpressionAttributeValues': checkDDBDict(eav), | |
'ConsistentRead': (True if str(consistent).lower() == 'true' else False), | |
} | |
# remove any keys that are none (optional) | |
for key in list(payload.keys()): | |
if payload[key] is None: | |
payload.pop(key) | |
try: | |
paginator_scan = self.client.get_paginator('scan') | |
# scan all users for vehicleid | |
ddbResponse = paginator_scan.paginate(**payload) | |
return process_paginator(ddbResponse) | |
except Exception as e: | |
error = f'Error scanning table: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
except Exception as e: | |
error = f'Validation error: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
# table - string | |
# key - dict | |
# consistent - bool - consistent read or not | |
# return_consumed_capacity - string - aggregation precision 'INDEXES'|'TOTAL'|'NONE' | |
# pe - string - valuse you want to return from table | |
# ean - dict - dict of values that might be protected { '#mode': 'mode' } | |
# | |
# returns item dict | |
# TODO: HANDLE MORE OPTIONS FOR GET_ITEM | |
def get_item(self, table, key, pe=None, ean=None, consistent=False, return_consumed_capacity=None): | |
return_consumed_capacity_options = ['INDEXES', 'TOTAL', 'NONE'] | |
try: | |
payload = { | |
'TableName': checkTable(table), | |
'Key': checkDDBDict(key, optional=False), | |
'ProjectionExpression': checkString(pe), | |
'ExpressionAttributeNames': checkDict(ean), | |
'ConsistentRead': (True if str(consistent).lower() == 'true' else False), | |
'ReturnConsumedCapacity': checkString(return_consumed_capacity, values=return_consumed_capacity_options), | |
} | |
# remove any keys that are none (optional) | |
for key in list(payload.keys()): | |
if payload[key] is None: | |
payload.pop(key) | |
try: | |
ddbResponse = self.client.get_item(**payload) | |
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200: | |
self.logging.debug(f'Getting item in {table}') | |
# self.logging.debug(json.dumps(loads(ddbResponse['Item']), default=json_serial)) | |
return json.loads(json.dumps(loads(ddbResponse['Item']), default=json_serial)) | |
else: | |
self.logging.error(f'Issue getting item in {table}.') | |
return ddbResponse | |
except Exception as e: | |
error = f'Error getting item in table: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
except Exception as e: | |
error = f'Validation error: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
# table - string | |
# key - dict | |
# return_consumed_capacity - string - aggregation precision 'INDEXES'|'TOTAL'|'NONE' | |
# ean - dict - dict of values that might be protected { '#mode': 'mode' } | |
# eav - dict(or DDB dict) - dict of values to replace for filter expression{ ':mode': 'mode' } | |
# ce - string - conditional expression, conditions for your delete if column_val == 100 | |
# return_values - string ALL_OLD | UPDATED_OLD | ALL_NEW | UPDATED_NEW | |
# | |
# delete item by key | |
# TODO: HANDLE MORE OPTIONS FOR DELETE_ITEM | |
def delete_item(self, table, key, ean=None, eav=None, ce=None, return_consumed_capacity=None, return_values=None): | |
return_value_options = ['ALL_OLD', 'UPDATED_OLD', 'ALL_NEW', 'UPDATED_NEW'] | |
return_consumed_capacity_options = ['INDEXES', 'TOTAL', 'NONE'] | |
try: | |
payload = { | |
'TableName': checkTable(table), | |
'Key': checkDDBDict(key, optional=False), | |
'ExpressionAttributeNames': checkDict(ean), | |
'ExpressionAttributeValues': checkDDBDict(eav), | |
'ConditionExpression': checkString(return_consumed_capacity), | |
'ReturnConsumedCapacity': checkString(return_consumed_capacity, values=return_consumed_capacity_options), | |
'ReturnValues': checkString(return_values, values=return_value_options), | |
} | |
# remove any keys that are none (optional) | |
for key in list(payload.keys()): | |
if payload[key] is None: | |
payload.pop(key) | |
try: | |
ddbResponse = self.client.delete_item(**payload) | |
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200: | |
self.logging.debug(f'Deleting item in {table}') | |
return ddbResponse | |
else: | |
self.logging.error(f'Issue deleting item in {table}.') | |
return ddbResponse | |
except Exception as e: | |
error = f'Error deleting item in table: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
except Exception as e: | |
error = f'Validation error: {e}.' | |
self.logging.error(error) | |
raise Exception({'error': error}) | |
``` |
Author
trilom
commented
Mar 2, 2020
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment