Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jayme-github/61a052b6d14c9b4f7a6e3bc53c36539d to your computer and use it in GitHub Desktop.
Save jayme-github/61a052b6d14c9b4f7a6e3bc53c36539d to your computer and use it in GitHub Desktop.
from itertools import chain, imap, groupby
from operator import itemgetter
empty = object()
def lookup_merge_strategy(tv, path):
# FIXME: Implement
pass
def are_equal_types(*iterables):
# func sliceElementType
# Returns True if all elements of N list(s) are of the same type but not
# list(). If the types are different, or list(), returns False.
types = chain(*(map(type, x) for x in iterables))
try:
first = next(types)
if first == type(list):
return False
return all(first == rest for rest in types)
except StopIteration:
return True
def uniqify_and_sort(s):
# func uniqifyAndSortScalars
return list(imap(itemgetter(0), groupby(sorted(s))))
def diff_list_of_dicts(original, modified, merge_key, ignore_mods, ignore_dels):
# Returns a (recursive) strategic merge patch that yields modified when applied to original,
# for a pair of lists of maps with merge semantics.
patch = []
originalSorted = sortMergeListsByNameArray(original, merge_key, false)
modifiedSorted = sortMergeListsByNameArray(modified, merge_key, false)
def diff_list_of_scalars(original, modified):
# Returns a (recursive) strategic merge patch that yields modified when applied to original,
# for a pair of lists of scalars with merge semantics.
if not modified:
# There is no need to check the length of original because there is no way to create
# a patch that deletes a scalar from a list of scalars with merge semantics.
return []
patch = []
original = uniqify_and_sort(original)
modified = uniqify_and_sort(modified)
orig_idx = mod_idx = 0
while mod_idx < len(modified) and orig_idx < len(original):
mod_idx += 1
while orig_idx < len(original):
orig_idx += 1
orig_s = str(original[orig_idx])
mod_s = str(modified[mod_idx])
if orig_s != mod_s:
patch.append(modified[mod_idx])
break
# Add any remaining items found only in modified
while mod_idx < len(modified):
mod_idx += 1
patch.append(modified[mod_idx])
return patch
def diff_list(original, modified, merge_key, ignore_mods, ignore_dels):
if not original:
if not modified or ignore_mods:
return []
return modified
assert are_equal_types(original, modified), "not equal types, or list"
if isinstance(original[0], dict):
return diff_list_of_dicts(original, modified, merge_key, ignore_mods, ignore_dels)
elif not ignore_mods:
return diff_list_of_scalars(original, modified)
return []
def diff_dict(original, modified, tv, ignore_mods, ignore_dels, path=""):
patch = {}
for k, mv in modified.items():
path = path + "." + k if path else k
ov = original.get(k, empty)
if ov is empty:
# key was added, so add to patch
if not ignore_mods:
patch[k] = mv
continue
if k == "$patch":
assert all([isinstance(ov, str), isinstance(mv, str)]), "invalid value for special key '{}'".format(k)
if mv != ov:
patch["$patch"] = mv
continue
# Types have changed, so add to patch
if type(ov) != type(mv):
if not ignore_mods:
path[k] = mv
continue
# Types are the same, so compare values
if isinstance(ov, dict):
pv = diff_dict(ov, mv, ignore_mods, ignore_dels)
if pv:
patch[k] = pv
continue
if isinstance(ov, list):
ms = lookup_merge_strategy(tv, path)
if ms is not None:
if ms.patch_strategy == "merge":
pv = diff_list(ov, mv, ms.merge_key, ignore_mods, ignore_dels)
if pv:
patch[k] = pv
continue
if not ignore_mods:
if ov != mv:
# values are different, so add to patch
patch[k] = mv
if not ignore_dels:
# Add Nones for deleted values
for k in original:
if k not in modified:
patch[k] = None
return patch
def find_dict_in_list_by_key_value(m, key, value):
for i, v in enumerate(m):
if not isinstance(v, dict):
return None, 0, False
try:
if v[key] == value:
return v, i, True
except KeyError:
pass
return None, 0, False
def merge_list(original, patch, elemType, mergeKey):
# Merge two lists together.
if len(original) == 0 and len(patch) == 0:
return original
# All the values must be of the same type, but not a list.
assert are_equal_types(original, patch), "not equal types, or list"
# If the elements are not dicts, merge the slices of scalars.
if not isinstance(original[0], dict):
# Maybe in the future add a "concat" mode that doesn't
# uniqify.
return uniqify_and_sort(original + patch)
if mergeKey == "":
raise ValueError("cannot merge lists without merge key for type %s",
elemType)
# First look for any special $patch elements.
patchWithoutSpecialElements = list()
replace = False
for v in patch:
patchType = v.get("$patch")
if patchType is not None:
if patchType == "delete":
try:
mergeValue = v[mergeKey]
except KeyError:
raise Exception("delete patch type with no merge key defined")
originalMap, originalKey, found = find_dict_in_list_by_key_value(original, mergeKey, mergeValue)
if found:
# Delete the element at originalKey.
del(original[originalKey])
elif patchType == "replace":
replace = True
elif patchType == "merge":
raise ValueError("merging lists cannot yet be specified in the patch")
else:
raise ValueError("unknown patch type: %s in: %s",
patchType, v)
else:
patchWithoutSpecialElements.append(v)
if replace:
return patchWithoutSpecialElements
patch = patchWithoutSpecialElements
# Merge patch into original.
for v in patch:
# Because earlier we confirmed that all the elements are maps.
try:
mergeValue = v[mergeKey]
except KeyError:
raise Exception("dict %s does not contain declared merge key: %s",
v, mergeKey)
# If we find a value with this merge key value in original, merge the
# dicts. Otherwise append onto original.
originalMap, originalKey, found = find_dict_in_list_by_key_value(original, mergeKey, mergeValue)
if found:
# Merge into original.
mergedMaps = merge_dict(originalMap, v, elemType)
original[originalKey] = mergedMaps
else:
original.append(v)
return original
def merge_dict(original, patch, tv):
# Merge fields from a patch dict into the original dict.
if patch.get("$patch") == "replace":
# If the patch contains "$patch: replace", don't merge it, just use the
# patch directly.
del(patch["$patch"])
return patch
elif patch.get("$patch") == "delete":
# If the patch contains "$patch: delete", don't merge it, just return
# an empty dict.
return dict()
else:
raise ValueError("unknown patch type: %s in: %s",
path.get("$patch"), patch)
# None is an accepted value for original to simplify logic in other places.
# If original is None, replace it with an empty map and then apply the patch.
if original is None:
original = dict()
# Start merging the patch into the original
for k, patchV in patch.items():
# If the value of this key is None, delete the key if it exists in the
# original. Otherwise, skip it.
if patchV is None:
if k in original:
del(original[k])
continue
if k not in original:
# If it's not in the original document, just take the patch value.
original[k] = patchV
continue
# If they're both dicts or lists, recurse into the value.
if type(original[k]) == type(patchV):
# First find the fieldPatchStrategy and fieldPatchMergeKey.
f_tv, fieldPatchStrategy, fieldPatchMergeKey = lookup_merge_strategy(tv, k)
if isinstance(original[k], dict) and fieldPatchStrategy != "replace":
original[k] = merge_dict(original[k], patchV, f_tv)
continue
if isinstance(original[k], list) and fieldPatchStrategy == "merge":
original[k] = merge_list(original[k], patchV, f_tv, fieldPatchMergeKey)
continue
# If types are different OR the types are both dict or list but we're
# just supposed to replace them, just take the value from patch.
original[k] = patchV
return original
def create_three_way_merge_patch(original, modified, current, tv):
# The patch is the difference from current to modified without deletions, plus deletions
# from original to modified. To find it, we compute deletions, which are the deletions from
# original to modified, and delta, which is the difference from current to modified without
# deletions, and then apply delta to deletions as a patch, which should be strictly additive.
# :param original: original configuration of the object from the
# annotation ('last-applied-configuration') or None if no annotation was
# found
# :param modified: new configuration that needs to be applied
# :param current: current configuration as active on the server
if original is None:
original = dict()
delta = diff_dict(current, modified, tv, False, True)
deletions = diff_dict(original, modified, tv, True, False)
patch = merge_dict(deletions, delta, tv)
return patch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment