Skip to content

Instantly share code, notes, and snippets.

@TomFaulkner
Created May 10, 2019 16:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TomFaulkner/6d445c85e55c97df920d8722d883109c to your computer and use it in GitHub Desktop.
Save TomFaulkner/6d445c85e55c97df920d8722d883109c to your computer and use it in GitHub Desktop.
Dynamo (or other noSQL) record upgrades
import logging
LOGGER = logging.getLogger(__name__)
class SchemaUpgradeException(Exception):
""""""
def upgrade(data, versions=None):
"""Upgrades Dynamo data to current version using an iterable of upgrades.
versions - an iterable of functions that receive and return a dict.
Each version function must set the new version.
Returns a two tuple, bool whether an upgrade occurred, and data dict.
Since current version is determined based on `len(versions)` data versions
should be zero indexed.
>>> data = {}
>>> def v1(data):
>>> data['new_key'] = True
>>> data['version'] = 1
>>> return data
>>> def v2(data):
>>> data['great_key'] = 3
>>> data['version'] = 2
>>> return data
>>> versions = [v1, v2]
>>> current = 2
>>> upgraded, data = upgrade(data, current, versions)
(True, {'version': 2, 'new_key': True, 'great_key': 3})
"""
if not versions:
versions = []
current = len(versions)
data_version = data.get('version', 0)
if data_version == current:
return False, data
try:
for version in versions[data_version:]:
data = version(data)
except Exception as e:
LOGGER.exception('Failed to upgrade data to current schema.')
raise SchemaUpgradeException(e)
if data.get('version', 0) == current:
return True, data
raise SchemaUpgradeException(
'Upgrades did not bring data to current version (%s).' % current
)
from unittest import TestCase
from dynamo_upgrade import upgrade, SchemaUpgradeException
def v1(data):
data['version'] = 1
data['narrativePagebreaks'] = []
return data
def v2(data):
data['version'] = 2
return data
def v3(data):
data['version'] = 3
data['new_key'] = True
return data
def fail_func():
raise KeyError()
def fail_improper_version_after_upgrade(data):
data['version'] = 0
return data
versions = [
v1,
v2,
v3,
]
class TestDynamoUpgrade(TestCase):
"""upgrade"""
def test_upgrade_from_none_versioned(self):
"""upgrade from none to versioned"""
upgraded, res = upgrade({}, versions)
self.assertDictEqual(
{'version': 3, 'narrativePagebreaks': [], 'new_key': True},
res
)
self.assertTrue(upgraded)
def test_upgrade_from_non_zero_version(self):
"""upgrade non-zero to versioned doesn't perform earlier upgrades
and overwrite data"""
upgraded, res = upgrade(
{'version': 1, 'narrativePagebreaks': ['1', '2']},
versions
)
self.assertDictEqual(
{'version': 3, 'narrativePagebreaks': ['1', '2'], 'new_key': True},
res,
)
self.assertTrue(upgraded)
def test_upgrade_not_necessary(self):
"""no upgrade necessary doesn't run any upgrades"""
upgraded, res = upgrade({'version': 3}, versions)
self.assertDictEqual({'version': 3}, res)
self.assertFalse(upgraded)
def test_fail_during_upgrade_raises_exception(self):
"""fail during upgrade raises exception"""
local_versions = [
v1,
fail_func,
]
with self.assertRaises(SchemaUpgradeException):
upgrade({'version': 1}, local_versions)
def test_failed_upgrade_raises_exception(self):
"""failed upgrade raises exception"""
local_versions = [
v1,
fail_improper_version_after_upgrade,
]
with self.assertRaises(SchemaUpgradeException):
upgrade({'version': 1}, local_versions)
def test_handles_current_version_zero(self):
"""handles the current version being zero or no versions list"""
local_versions = []
upgraded, data = upgrade({}, local_versions)
self.assertDictEqual(data, {})
self.assertFalse(upgraded)
upgraded, data = upgrade({})
self.assertDictEqual(data, {})
self.assertFalse(upgraded)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment