Skip to content

Instantly share code, notes, and snippets.

@trilom
Created March 2, 2020 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trilom/f1ea07ab34eeef1690024267cd1c8784 to your computer and use it in GitHub Desktop.
Save trilom/f1ea07ab34eeef1690024267cd1c8784 to your computer and use it in GitHub Desktop.
ddb wrapper
```python
import json
import logging
from datetime import date
from datetime import datetime
import boto3
from dynamodb_json import json_util as json_ddb
logger = logging.getLogger('aws.ddb')
logger.warn('Creating aws.ddb object...')
def json_serial(obj):
logger.debug('in json_serial()...')
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError('Type %s not serializable' % type(obj))
# this turns a regular dict into a dynamo dict
def dumps(input, **kwargs):
""" Dump the dict to json in DynamoDB Format
You can use any other simplejson or json options
:param dct - the dict to dump
:param as_dict - returns the result as python dict (useful for DynamoDB boto3 library) or as json sting
:returns: DynamoDB json format.
"""
return json_ddb.dumps(input, default=json_serial, **kwargs)
def loads(input, **kwargs):
""" Loads dynamodb json format to a python dict.
:param s - the json string or dict (with the as_dict variable set to True) to convert
:returns python dict object
"""
return json_ddb.loads(input, **kwargs)
# check ddb dict
def checkDDBDict(target, optional=True):
if target is None and optional is False:
raise Exception({'error': 'DDBDict not defined.'})
elif type(target) == dict:
# if dict dump to ddb string then load to object
target = json.loads(json_ddb.dumps(target))
elif type(target) != dict:
if type(target) == str:
# if a string assume its a json string load to object, dump to ddb string, load to object
target = json.loads(json_ddb.dumps(json.loads(target)))
return target
# check ddb dict
def checkDict(target, optional=True):
if target is None and optional is False:
raise Exception({'error': 'Dict not defined.'})
elif type(target) != dict:
if type(target) == str:
# if a string assume its a json string load to object, dump to ddb string, load to object
target = json.loads(target)
return target
# check string variables
def checkString(target, values=None, optional=True):
# check values
if values is not None:
if target not in values and optional is False:
raise Exception({'error': f'Target:{target} not in values defined{values}.'})
# check optional
if target is None and optional is False:
# no target
raise Exception({'error': 'String not defined.'})
elif target is None:
target = target
elif type(target) != str:
try:
target = str(target)
except Exception as exception:
raise Exception({'error': f'Cant convert {target} to string {exception}'})
return target
# check table variable
def checkTable(target):
if target is None or len(target) <= 1:
# table not defined
raise Exception({'error': 'Table not defined.'})
return target
def process_paginator(paginatorResponse):
data = []
try:
for page in paginatorResponse:
data.extend(json_ddb.loads(page['Items']))
return json.loads(json.dumps(data, default=json_serial))
except Exception as e:
raise Exception({'error': f'Cant process paginator, exception{e}.'})
def build_ue(data):
# make ue
ue = 'set '
try:
for key in data.keys():
ue += f'#{key} = :{key},'
# trim last char from ue, trailing comma
return ue[:-1]
except Exception as e:
raise Exception({'error': f'Cant build update expression, exception{e}.'})
def build_ean(data):
# make ean
ean = {}
try:
for key in data.keys():
ean[f'#{key}'] = f'{key}'
return ean
except Exception as e:
raise Exception({'error': f'Cant build expression attribute names, exception{e}.'})
def build_eav(data):
# make eav
eav = {}
try:
for key in data.keys():
eav[f':{key}'] = f'{data[key]}'
return eav
except Exception as e:
raise Exception({'error': f'Cant build expression attribute values, exception{e}.'})
class Dynamo():
def __init__(self):
self.logging = logging.getLogger('aws.ddb.Dynamo')
self.logging.warn('Creating aws.ddb.Dynamo object...')
self.client = self.connect_ddb()
def connect_ddb(self):
try:
client = boto3.client('dynamodb')
return client
except Exception as e:
error = f'Cannot connect to AWS DDB, exception:{e}'
self.logging.error(error)
raise Exception({'error': error})
# table - string - table to update item
# key - dict - dict of key you want to update { 'event_id': 'some_event_id' }
# ue - string - expression of values you want to alter 'set verified = :verified'
# ean - dict - dict of values that might be protected { '#mode': 'mode' }
# eav - dict(or DDB dict) - dict of values to replace for filter expression{ ':mode': 'mode' }
# return_values - string ALL_OLD | UPDATED_OLD | ALL_NEW | UPDATED_NEW
#
# returns ddbResponse object, if you have return values they will be here
# TODO: HANDLE MORE OPTIONS FOR UPDATE_ITEM
def update_item(self, table, key, ean=None, eav=None, ue=None, data=None, return_values=None):
return_value_options = ['ALL_OLD', 'UPDATED_OLD', 'ALL_NEW', 'UPDATED_NEW']
try:
if data is not None:
ue = (build_ue(data) if ue is None else ue)
ean = (build_ean(data) if ean is None else ean)
eav = (build_eav(data) if eav is None else eav)
payload = {
'TableName': checkTable(table),
'Key': checkDDBDict(key, optional=False),
'UpdateExpression': checkString(ue),
'ExpressionAttributeNames': checkDict(ean),
'ExpressionAttributeValues': checkDDBDict(eav),
'ReturnValues': checkString(return_values, values=return_value_options),
}
# remove any keys that are none (optional)
for key in list(payload.keys()):
if payload[key] is None:
payload.pop(key)
try:
ddbResponse = self.client.update_item(**payload)
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200:
self.logging.debug(f'Updated item in {table}')
return ddbResponse
else:
self.logging.error(f'Issue updating in {table}.')
return ddbResponse
except Exception as e:
error = f'Error updating item in table: {e}.'
self.logging.error(error)
raise Exception({'error': error})
except Exception as e:
error = f'Validation error: {e}.'
self.logging.error(error)
raise Exception({'error': error})
#
# table - string - table to put into
# item - string or dict - json object
# https://pypi.org/project/dynamodb-json/
#
# returns ddbResponse object, if you have return values they will be here
# TODO: HANDLE MORE OPTIONS FOR PUT_ITEM
def put_item(self, table, item):
try:
# item not optional
payload = {
'TableName': checkTable(table),
'Item': checkDDBDict(item, False),
}
# remove any keys that are none (optional)
# for key in payload.keys():
# if payload[key] is None:
# payload.pop(key)
try:
ddbResponse = self.client.put_item(**payload)
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200:
self.logging.debug(f'Added to {table}')
return ddbResponse
else:
self.logging.error(f'Issue adding to {table}.')
return ddbResponse
except Exception as e:
error = f'Error putting in table: {e}.'
self.logging.error(error)
raise Exception({'error': error})
except Exception as e:
error = f'Validation error: {e}.'
self.logging.error(error)
raise Exception({'error': error})
# table - string - table name
# pe - string - valuse you want to return from table
# fe - string - expression of values you want to filter from scan results (scan pulls all then filters)
# ean - dict - dict of values that might be protected { '#mode': 'mode' }
# eav - dict(or DDB dict) - dict of values to replace for filter expression{ ':mode': 'mode' }
#
# returns list of items
# TODO: HANDLE MORE OPTIONS FOR SCAN_TABLE
def scan_table(self, table, pe=None, fe=None, eav=None, ean=None, consistent=False):
try:
payload = {
'TableName': checkTable(table),
'ProjectionExpression': checkString(pe),
'FilterExpression': checkString(fe),
'ExpressionAttributeNames': checkDict(ean),
'ExpressionAttributeValues': checkDDBDict(eav),
'ConsistentRead': (True if str(consistent).lower() == 'true' else False),
}
# remove any keys that are none (optional)
for key in list(payload.keys()):
if payload[key] is None:
payload.pop(key)
try:
paginator_scan = self.client.get_paginator('scan')
# scan all users for vehicleid
ddbResponse = paginator_scan.paginate(**payload)
return process_paginator(ddbResponse)
except Exception as e:
error = f'Error scanning table: {e}.'
self.logging.error(error)
raise Exception({'error': error})
except Exception as e:
error = f'Validation error: {e}.'
self.logging.error(error)
raise Exception({'error': error})
# table - string
# key - dict
# consistent - bool - consistent read or not
# return_consumed_capacity - string - aggregation precision 'INDEXES'|'TOTAL'|'NONE'
# pe - string - valuse you want to return from table
# ean - dict - dict of values that might be protected { '#mode': 'mode' }
#
# returns item dict
# TODO: HANDLE MORE OPTIONS FOR GET_ITEM
def get_item(self, table, key, pe=None, ean=None, consistent=False, return_consumed_capacity=None):
return_consumed_capacity_options = ['INDEXES', 'TOTAL', 'NONE']
try:
payload = {
'TableName': checkTable(table),
'Key': checkDDBDict(key, optional=False),
'ProjectionExpression': checkString(pe),
'ExpressionAttributeNames': checkDict(ean),
'ConsistentRead': (True if str(consistent).lower() == 'true' else False),
'ReturnConsumedCapacity': checkString(return_consumed_capacity, values=return_consumed_capacity_options),
}
# remove any keys that are none (optional)
for key in list(payload.keys()):
if payload[key] is None:
payload.pop(key)
try:
ddbResponse = self.client.get_item(**payload)
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200:
self.logging.debug(f'Getting item in {table}')
# self.logging.debug(json.dumps(loads(ddbResponse['Item']), default=json_serial))
return json.loads(json.dumps(loads(ddbResponse['Item']), default=json_serial))
else:
self.logging.error(f'Issue getting item in {table}.')
return ddbResponse
except Exception as e:
error = f'Error getting item in table: {e}.'
self.logging.error(error)
raise Exception({'error': error})
except Exception as e:
error = f'Validation error: {e}.'
self.logging.error(error)
raise Exception({'error': error})
# table - string
# key - dict
# return_consumed_capacity - string - aggregation precision 'INDEXES'|'TOTAL'|'NONE'
# ean - dict - dict of values that might be protected { '#mode': 'mode' }
# eav - dict(or DDB dict) - dict of values to replace for filter expression{ ':mode': 'mode' }
# ce - string - conditional expression, conditions for your delete if column_val == 100
# return_values - string ALL_OLD | UPDATED_OLD | ALL_NEW | UPDATED_NEW
#
# delete item by key
# TODO: HANDLE MORE OPTIONS FOR DELETE_ITEM
def delete_item(self, table, key, ean=None, eav=None, ce=None, return_consumed_capacity=None, return_values=None):
return_value_options = ['ALL_OLD', 'UPDATED_OLD', 'ALL_NEW', 'UPDATED_NEW']
return_consumed_capacity_options = ['INDEXES', 'TOTAL', 'NONE']
try:
payload = {
'TableName': checkTable(table),
'Key': checkDDBDict(key, optional=False),
'ExpressionAttributeNames': checkDict(ean),
'ExpressionAttributeValues': checkDDBDict(eav),
'ConditionExpression': checkString(return_consumed_capacity),
'ReturnConsumedCapacity': checkString(return_consumed_capacity, values=return_consumed_capacity_options),
'ReturnValues': checkString(return_values, values=return_value_options),
}
# remove any keys that are none (optional)
for key in list(payload.keys()):
if payload[key] is None:
payload.pop(key)
try:
ddbResponse = self.client.delete_item(**payload)
if ddbResponse.get('ResponseMetadata', {}).get('HTTPStatusCode') == 200:
self.logging.debug(f'Deleting item in {table}')
return ddbResponse
else:
self.logging.error(f'Issue deleting item in {table}.')
return ddbResponse
except Exception as e:
error = f'Error deleting item in table: {e}.'
self.logging.error(error)
raise Exception({'error': error})
except Exception as e:
error = f'Validation error: {e}.'
self.logging.error(error)
raise Exception({'error': error})
```
@trilom
Copy link
Author

trilom commented Mar 2, 2020

def get_webhooks():
    return _ddb.scan_table(
        table='twitch-WebhookSubscriptionEvents-prod',
        pe='topic, callbackUrl, lease_seconds, #mode, persist, verified, next_refresh, secret',
        ean={'#mode': 'mode'},
        consistent=True,
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment