Skip to content

Instantly share code, notes, and snippets.

@luizbraga
Last active May 23, 2019 21:16
Show Gist options
  • Save luizbraga/f44a9e7001bda0e40663c18948a1d5d5 to your computer and use it in GitHub Desktop.
Save luizbraga/f44a9e7001bda0e40663c18948a1d5d5 to your computer and use it in GitHub Desktop.
Python Dynamo Interface Class
from abc import ABC, abstractmethod
from copy import deepcopy
from boto3.dynamodb.conditions import Attr
class DynamoQuery:
def __init__(self, query_dict):
self.query_dict = query_dict
@property
def query(self):
key_condition = None
for key in self.query_dict:
if not key_condition:
key_condition = Attr(key).eq(self.query_dict[key])
else:
key_condition = key_condition & Attr(key).eq(self.query_dict[key])
return key_condition
def get_query(self):
query = self.query
return query
def get_update_query(self, update_data):
update_dict = {}
for key in update_data:
update_dict[key] = {'Value': self.query_dict[key], 'Action': 'PUT'}
return update_dict
class DynamoQueryResults:
def __init__(self, data):
self.data = data
@property
def items(self):
return self.data.get('Items', [])
class DynamoApiInterface(ABC):
def __init__(self, table_name):
self.table_name = table_name
@property
@abstractmethod
def dynamo_conn(self):
"""
dynamo_conn should return an instance of a boto3 DynamoDB connection
Example of a valid return:
return boto3.resource('dynamodb', region_name='us-east-1')
"""
@property
def dynamo_table(self):
if not self._table:
self._table = self.dynamo_conn.Table(self.table_name)
return self._table
def get(self, item_keys):
return self.dynamo_table.get_item(Key=item_keys)
def filter(self, **query_dict):
dynamo_query = DynamoQuery(query_dict)
data = self.dynamo_table.scan(
FilterExpression=dynamo_query.get_query()
)
return DynamoQueryResults(data)
def all(self, **query_dict):
return self.filter()
def insert(self, item_dict):
self.dynamo_table.put_item(Item=item_dict)
return item_dict
def remove(self, item_keys):
return self.dynamo_table.delete_item(Key=item_keys)
def remove_field(self, item_keys, field_name):
attr_update = {
field_name: {'Action': 'DELETE'}
}
return self.dynamo_table.update_item(
Key=item_keys,
AttributeUpdates=attr_update)
def update(self, item_keys, item_dict, **kwargs):
dynamo_query = DynamoQuery(item_dict)
update_dict = {}
if kwargs:
update_keys = kwargs
else:
update_keys = deepcopy(item_dict)
[update_keys.pop(k, None) for k in self.key_fields]
update_dict = dynamo_query.get_update_query(update_keys)
return self.dynamo_table.update_item(
Key=item_keys,
AttributeUpdates=update_dict)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment