Skip to content

Instantly share code, notes, and snippets.

@nanvel
Created September 11, 2015 16:37
Show Gist options
  • Save nanvel/3bdf8e577119b5d9dc4f to your computer and use it in GitHub Desktop.
Save nanvel/3bdf8e577119b5d9dc4f to your computer and use it in GitHub Desktop.
DDBTable class for DynamoDB in examples (http://nanvel.com/p/dynamodb)
import botocore.session
import logging
import re
import six
from functools import partial
logger = logging.getLogger()
class AmazonException(Exception):
def __init__(self, message, code='unknown'):
self.message = message
self.code = code
def __str__(self):
return self.message
class DDBException(AmazonException):
ITEM_ENCODE_ERROR = 'ItemEncodeError'
ITEM_DECODE_ERROR = 'ItemDecodeError'
class Botocore(object):
_session = None
def __init__(self, service, operation, region_name, endpoint_url=None):
session = Botocore._session
if session is None:
session = botocore.session.get_session()
Botocore._session = session
service = session.get_service(service_name=service)
self.endpoint = service.get_endpoint(
region_name=region_name, endpoint_url=endpoint_url)
self.operation = service.get_operation(operation_name=operation)
def call(self, **kwargs):
response, message = self.operation.call(endpoint=self.endpoint, **kwargs)
if response.status_code != 200:
raise AmazonException(
message='DynamoDB request error: {message}.'.format(
message=message.get('Error', {'Message': 'unknown'})['Message']),
code=message.get('Error', {'Code': 'unknown'})['Code'])
return message
class DDBField(object):
@classmethod
def _validate(cls, value):
raise NotImplementedError('Not implemented.')
@classmethod
def decode(cls, value):
try:
return cls._validate(value)
except (TypeError, ValueError):
raise DDBException(
message='Invalid value for {cls} decode.'.format(cls=cls.__name__),
code=DDBException.ITEM_DECODE_ERROR)
@classmethod
def encode(cls, value):
try:
return str(cls._validate(value))
except (TypeError, ValueError):
raise DDBException(
message='Invalid value for {cls} encode.'.format(cls=cls.__name__),
code=DDBException.ITEM_ENCODE_ERROR)
class DDBIntField(DDBField):
AMAZON_TYPE = 'N'
@classmethod
def _validate(cls, value):
if isinstance(value, int):
return value
return int(value)
class DDBInt_IntField(DDBField):
AMAZON_TYPE = 'S'
_REGEXP = re.compile('\d+_\d+')
@classmethod
def _validate(cls, value):
if not isinstance(value, str):
value = str(value)
if cls._REGEXP.match(value) is None:
raise ValueError('Int_Int required.')
return value
class DDBUUIDField(DDBField):
AMAZON_TYPE = 'S'
_UUID_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
@classmethod
def _validate(cls, value):
if not isinstance(value, str):
value = str(value)
if cls._UUID_REGEXP.match(value) is None:
raise ValueError('UUID required.')
return value
class DDBUUID_UUIDField(DDBField):
AMAZON_TYPE = 'S'
_VALUE_REGEXP = re.compile('[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}_[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
@classmethod
def _validate(cls, value):
if not isinstance(value, str):
value = str(value)
if cls._VALUE_REGEXP.match(value) is None:
raise ValueError('UUID_UUID required.')
return value
class DDBStrField(DDBField):
AMAZON_TYPE = 'S'
@classmethod
def _validate(cls, value):
if not isinstance(value, str):
value = str(value)
return value
class DDBTable(object):
TABLE_NAME = ''
REGION_NAME = 'us-west-2'
KEY_SCHEMA = []
LOCAL_SECONDARY_INDEXES = []
GLOBAL_SECONDARY_INDEXES = []
PROVISIONED_THROUGHPUT = {}
FIELDS = {}
def _get_table_name(self):
return self.TABLE_NAME
def _get_table_kwargs(self):
key_fields = set()
for key in self.KEY_SCHEMA:
key_fields.add(key['AttributeName'])
for index in self.LOCAL_SECONDARY_INDEXES:
for key in index['KeySchema']:
key_fields.add(key['AttributeName'])
for index in self.GLOBAL_SECONDARY_INDEXES:
for key in index['KeySchema']:
key_fields.add(key['AttributeName'])
attribute_definitions = []
for field_name in key_fields:
attribute_definitions.append({
'AttributeName': field_name,
'AttributeType': self.FIELDS[field_name].AMAZON_TYPE
})
kwargs = {
'TableName': self._get_table_name(),
'AttributeDefinitions': attribute_definitions,
'KeySchema': self.KEY_SCHEMA,
'ProvisionedThroughput': self.PROVISIONED_THROUGHPUT,
}
if getattr(self, 'LOCAL_SECONDARY_INDEXES', None):
kwargs['LocalSecondaryIndexes'] = self.LOCAL_SECONDARY_INDEXES
if getattr(self, 'GLOBAL_SECONDARY_INDEXES', None):
kwargs['GlobalSecondaryIndexes'] = self.GLOBAL_SECONDARY_INDEXES
return kwargs
def _get_endpoint_url(self):
return None
def _dynamodb(self, operation):
return Botocore(
service='dynamodb', operation=operation,
region_name=self.REGION_NAME, endpoint_url=self._get_endpoint_url())
def create_table(self):
try:
message = self._dynamodb(operation='DescribeTable').call(
TableName=self._get_table_name())
except AmazonException as e:
if e.code != 'ResourceNotFoundException':
raise e
logger.warning('Creation {table_name} table ...'.format(
table_name=self._get_table_name()))
message = self._dynamodb(operation='CreateTable').call(
**self._get_table_kwargs())
else:
logger.warning('{table_name} table already exists.'.format(
table_name=self._get_table_name()))
def encode_item(self, data, keys=None, update=False):
if not data:
return {}
keys = keys or data.keys()
item = {}
for key in keys:
if key not in data:
continue
val = self.FIELDS[key].encode(value=data[key])
if update:
item[key] = {
'Value': {
self.FIELDS[key].AMAZON_TYPE: val
},
'Action': 'PUT'
}
else:
item[key] = {
self.FIELDS[key].AMAZON_TYPE: val
}
return item
def decode_item(self, item, keys=None):
data = {}
for key, val in six.iteritems(item):
if key not in self.FIELDS:
continue
if keys and key not in keys:
continue
data[key] = self.FIELDS[key].decode(
val[self.FIELDS[key].AMAZON_TYPE])
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment