$ cat a.json b.json
{
"a": {
"bb": [
1,
3,
4
],
"cc": {
"a": 11,
"b": 2,
"c": 3,
"d": 4
}
},
"common": 0,
"common2": 0,
"extra1": 3
}
{
"a": {
"bb": [
1,
2,
4
],
"cc": {
"a": 1,
"b": 2,
"e": 4,
"f": 3
}
},
"common": 0,
"common2": 1,
"extra2": 3
}
$ ./dict-compare.py a.json b.json -s LEFT
{
"a": {
"bb": [
1,
3,
4
],
"cc": {
"a": 11,
"b": 2,
"c": 3,
"d": 4
}
},
"common": 0,
"common2": 0,
"extra1": 3
}
$ ./dict-compare.py a.json b.json -s RIGHT
{
"a": {
"bb": [
1,
2,
4
],
"cc": {
"a": 1,
"b": 2,
"e": 4,
"f": 3
}
},
"common": 0,
"common2": 1,
"extra2": 3
}
$ ./dict-compare.py a.json b.json # -s DIFF
{
"extra2": 3,
"extra1": 3,
"common2": {
"left": 0,
"right": 1
},
"a": {
"cc": {
"e": 4,
"f": 3,
"c": 3,
"d": 4,
"a": {
"left": 11,
"right": 1
},
"b": 2
},
"bb": [
1,
{
"left": 3,
"right": 2
},
4
]
},
"common": 0
}
$ ./dict-compare.py a.json b.json --verbose # -s DIFF
{
"extra2": 3,
"extra1": 3,
"common2": {
"left": 0,
"right": 1,
"same": true
},
"common": {
"left": 0,
"right": 0,
"same": false
},
"a": {
"cc": {
"e": 4,
"f": 3,
"c": 3,
"d": 4,
"b": {
"left": 2,
"right": 2,
"same": false
},
"a": {
"left": 11,
"right": 1,
"same": true
}
},
"bb": [
{
"left": 1,
"right": 1,
"same": false
},
{
"left": 3,
"right": 2,
"same": true
},
{
"left": 4,
"right": 4,
"same": false
}
]
}
}
$ ./dict-compare.py a.json b.json -s MERGE
{
"extra2": 3,
"extra1": 3,
"common2": {
"left": 0,
"right": 1
},
"common": 0,
"a": {
"bb": [
1,
{
"left": 3,
"right": 2
},
4
],
"cc": {
"e": 4,
"f": 3,
"c": 3,
"d": 4,
"b": 2,
"a": {
"left": 11,
"right": 1
}
}
}
}
Last active
January 17, 2019 13:09
-
-
Save weaming/299d141be54a5578aa61233d048aa5a0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# coding: utf-8 | |
""" | |
Author : weaming | |
Created Time : 2019-01-17 14:34:26 | |
""" | |
import os | |
import sys | |
import json | |
import argparse | |
import datetime | |
from collections import OrderedDict | |
LIST_TYPES = (list, tuple) | |
DICT_TYPES = (dict, OrderedDict) | |
BASIC_TYPES = (int, float, str) | |
NOT_SET = object() | |
# enum consts | |
LIST = "LIST" | |
DICT = "DICT" | |
BASIC = "BASIC" | |
LEFT = "LEFT" | |
RIGHT = "RIGHT" | |
DIFF = "DIFF" | |
MERGE = "MERGE" | |
strategies = [LEFT, RIGHT, DIFF, MERGE] | |
def to_ordered_dict(data): | |
return OrderedDict( | |
[ | |
(k, to_ordered_dict(v) if isinstance(v, dict) else v) | |
for k, v in sorted(data.items()) # sort by keys and values | |
] | |
) | |
class DictDiffer: | |
def __init__(self, left, right, strategy): | |
assert strategy in strategies | |
self.left = left | |
self.right = right | |
self.strategy = strategy | |
self.cursor = None | |
self.result = None | |
@staticmethod | |
def safe_get(data, cursor): | |
if isinstance(data, LIST_TYPES): | |
if cursor + 1 <= len(data): | |
return data[cursor] | |
elif isinstance(data, DICT_TYPES): | |
return data.get(cursor) | |
else: | |
raise Exception(f"unknown type {type(data)}: {data}") | |
@staticmethod | |
def safe_set(data, cursor, value, ignore_noset=True): | |
if ignore_noset and value is NOT_SET: | |
return | |
if isinstance(data, LIST_TYPES): | |
# fill the gap between the target list index | |
for _ in range(cursor + 1 - len(data)): | |
data.append(None) | |
elif isinstance(data, DICT_TYPES): | |
pass | |
else: | |
raise Exception(f"unknown type {type(data)}: {data}") | |
data[cursor] = value | |
@staticmethod | |
def get_type(data, allow_basic=False): | |
if data is None: | |
return BASIC | |
if isinstance(data, LIST_TYPES): | |
return LIST | |
elif isinstance(data, DICT_TYPES): | |
return DICT | |
elif allow_basic and isinstance(data, BASIC_TYPES): | |
return BASIC | |
else: | |
raise Exception(f"unknown type {type(data)}: {data}") | |
@staticmethod | |
def get_keys_iterator(data): | |
if isinstance(data, LIST_TYPES): | |
return range(len(data)) | |
elif isinstance(data, DICT_TYPES): | |
return data.keys() | |
else: | |
raise Exception(f"unknown type {type(data)}: {data}") | |
@staticmethod | |
def same_length(a: list, b: list): | |
swithed = False | |
if len(a) > len(b): | |
a, b = b, a | |
# now len(a) <= len(b) | |
a += [None] * (len(b) - len(a)) | |
return (a, b) if not swithed else (b, a) | |
def _compare_current(self): | |
assert self.get_type(self.left) == self.get_type(self.right) | |
assert self.strategy in [LEFT, RIGHT] | |
# value | |
_left_v = self.safe_get(self.left, self.cursor) | |
_right_v = self.safe_get(self.right, self.cursor) | |
value = DictDiffer(_left_v, _right_v, self.strategy).compare() | |
if self.result is not None: | |
self.safe_set(self.result, self.cursor, value) | |
else: | |
self.result = value | |
def _loop_on_data(self): | |
assert self.get_type(self.left) == self.get_type(self.right) | |
_iterator = NOT_SET | |
if self.strategy == LEFT: | |
_iterator = self.get_keys_iterator(self.left) | |
elif self.strategy == RIGHT: | |
_iterator = self.get_keys_iterator(self.right) | |
elif self.strategy in [DIFF, MERGE]: | |
t = self.get_type(self.left) | |
if t == LIST: | |
rv = [ | |
DictDiffer(a, b, self.strategy).compare() | |
for a, b in zip(*self.same_length(self.left, self.right)) | |
] | |
rv = [x for x in rv if x is not NOT_SET] | |
self.result += rv | |
elif t == DICT: | |
self.result.update( | |
(k, v) for k, v in self.right.items() if k not in self.left | |
) | |
self.result.update( | |
(k, v) for k, v in self.left.items() if k not in self.right | |
) | |
# go deeper | |
rv = { | |
k: DictDiffer(self.left[k], self.right[k], self.strategy).compare() | |
for k in set(self.left) & set(self.right) | |
} | |
rv = {k: v for k, v in rv.items() if v is not NOT_SET} | |
self.result.update(rv) | |
else: | |
raise Exception(f"unknown strategy {self.strategy}") | |
if _iterator is not NOT_SET: | |
for x in _iterator: | |
# set cursor | |
# print("->", x) | |
self.cursor = x | |
self._compare_current() | |
def compare(self): | |
# split on differt types | |
left_type = self.get_type(self.left, allow_basic=True) | |
right_type = self.get_type(self.right, allow_basic=True) | |
if left_type != right_type or BASIC in [left_type, right_type]: | |
if self.strategy == LEFT: | |
return self.left | |
elif self.strategy == RIGHT: | |
return self.right | |
elif self.strategy == DIFF: | |
if os.getenv("VERBOSE"): | |
return { | |
"left": self.left, | |
"right": self.right, | |
"same": self.left != self.right, | |
} | |
if self.left != self.right: | |
return {"left": self.left, "right": self.right} | |
else: | |
return self.left | |
elif self.strategy == MERGE: | |
if self.left != self.right: | |
return {"left": self.left, "right": self.right} | |
else: | |
return self.left | |
return NOT_SET | |
else: | |
t = self.get_type(self.left) | |
if t == LIST: | |
self.result = [] | |
elif t == DICT: | |
self.result = {} | |
self._loop_on_data() | |
return self.result | |
def read_file(path, is_json=False): | |
path = os.path.expanduser(path) | |
path = os.path.expandvars(path) | |
if os.path.isfile(path): | |
with open(path) as f: | |
rv = f.read() | |
if is_json: | |
return json.loads(rv) | |
return rv | |
else: | |
print("warning: file {} does not exist".format(path)) | |
return None | |
def print_json(data): | |
def json_serializer(obj): | |
if isinstance(obj, (datetime.datetime, datetime.date)): | |
return obj.isoformat() | |
return str(obj) | |
# raise TypeError("Type %s not serializable" % type(obj)) | |
print( | |
json.dumps( | |
to_ordered_dict(data), ensure_ascii=False, indent=2, default=json_serializer | |
) | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Compare, diff, merge two json data") | |
parser.add_argument("left", help="json file path as left data") | |
parser.add_argument("right", help="json file path as left data") | |
parser.add_argument( | |
"-s", | |
"--strategy", | |
default=DIFF, | |
help="""the strategy to produce result data. | |
LEFT - only left have; | |
RIGHT - only right have; | |
DIFF - only different bettween left and right; | |
MERGE - merge into one, conflicts left unmerged. | |
""", | |
choices=strategies, | |
) | |
parser.add_argument( | |
"--verbose", action="store_true", help="output detail when use DIFF strategy" | |
) | |
args = parser.parse_args() | |
if args.verbose: | |
os.environ["VERBOSE"] = "1" | |
left = read_file(args.left, is_json=True) | |
right = read_file(args.right, is_json=True) | |
if left and right: | |
result = DictDiffer(left, right, args.strategy).compare() | |
print_json(result) | |
else: | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment