Skip to content

Instantly share code, notes, and snippets.

@weaming
Last active January 17, 2019 13:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weaming/299d141be54a5578aa61233d048aa5a0 to your computer and use it in GitHub Desktop.
Save weaming/299d141be54a5578aa61233d048aa5a0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# coding: utf-8
"""
Author : weaming
Created Time : 2019-01-17 14:34:26
"""
import os
import sys
import json
import argparse
import datetime
from collections import OrderedDict
LIST_TYPES = (list, tuple)
DICT_TYPES = (dict, OrderedDict)
BASIC_TYPES = (int, float, str)
NOT_SET = object()
# enum consts
LIST = "LIST"
DICT = "DICT"
BASIC = "BASIC"
LEFT = "LEFT"
RIGHT = "RIGHT"
DIFF = "DIFF"
MERGE = "MERGE"
strategies = [LEFT, RIGHT, DIFF, MERGE]
def to_ordered_dict(data):
return OrderedDict(
[
(k, to_ordered_dict(v) if isinstance(v, dict) else v)
for k, v in sorted(data.items()) # sort by keys and values
]
)
class DictDiffer:
def __init__(self, left, right, strategy):
assert strategy in strategies
self.left = left
self.right = right
self.strategy = strategy
self.cursor = None
self.result = None
@staticmethod
def safe_get(data, cursor):
if isinstance(data, LIST_TYPES):
if cursor + 1 <= len(data):
return data[cursor]
elif isinstance(data, DICT_TYPES):
return data.get(cursor)
else:
raise Exception(f"unknown type {type(data)}: {data}")
@staticmethod
def safe_set(data, cursor, value, ignore_noset=True):
if ignore_noset and value is NOT_SET:
return
if isinstance(data, LIST_TYPES):
# fill the gap between the target list index
for _ in range(cursor + 1 - len(data)):
data.append(None)
elif isinstance(data, DICT_TYPES):
pass
else:
raise Exception(f"unknown type {type(data)}: {data}")
data[cursor] = value
@staticmethod
def get_type(data, allow_basic=False):
if data is None:
return BASIC
if isinstance(data, LIST_TYPES):
return LIST
elif isinstance(data, DICT_TYPES):
return DICT
elif allow_basic and isinstance(data, BASIC_TYPES):
return BASIC
else:
raise Exception(f"unknown type {type(data)}: {data}")
@staticmethod
def get_keys_iterator(data):
if isinstance(data, LIST_TYPES):
return range(len(data))
elif isinstance(data, DICT_TYPES):
return data.keys()
else:
raise Exception(f"unknown type {type(data)}: {data}")
@staticmethod
def same_length(a: list, b: list):
swithed = False
if len(a) > len(b):
a, b = b, a
# now len(a) <= len(b)
a += [None] * (len(b) - len(a))
return (a, b) if not swithed else (b, a)
def _compare_current(self):
assert self.get_type(self.left) == self.get_type(self.right)
assert self.strategy in [LEFT, RIGHT]
# value
_left_v = self.safe_get(self.left, self.cursor)
_right_v = self.safe_get(self.right, self.cursor)
value = DictDiffer(_left_v, _right_v, self.strategy).compare()
if self.result is not None:
self.safe_set(self.result, self.cursor, value)
else:
self.result = value
def _loop_on_data(self):
assert self.get_type(self.left) == self.get_type(self.right)
_iterator = NOT_SET
if self.strategy == LEFT:
_iterator = self.get_keys_iterator(self.left)
elif self.strategy == RIGHT:
_iterator = self.get_keys_iterator(self.right)
elif self.strategy in [DIFF, MERGE]:
t = self.get_type(self.left)
if t == LIST:
rv = [
DictDiffer(a, b, self.strategy).compare()
for a, b in zip(*self.same_length(self.left, self.right))
]
rv = [x for x in rv if x is not NOT_SET]
self.result += rv
elif t == DICT:
self.result.update(
(k, v) for k, v in self.right.items() if k not in self.left
)
self.result.update(
(k, v) for k, v in self.left.items() if k not in self.right
)
# go deeper
rv = {
k: DictDiffer(self.left[k], self.right[k], self.strategy).compare()
for k in set(self.left) & set(self.right)
}
rv = {k: v for k, v in rv.items() if v is not NOT_SET}
self.result.update(rv)
else:
raise Exception(f"unknown strategy {self.strategy}")
if _iterator is not NOT_SET:
for x in _iterator:
# set cursor
# print("->", x)
self.cursor = x
self._compare_current()
def compare(self):
# split on differt types
left_type = self.get_type(self.left, allow_basic=True)
right_type = self.get_type(self.right, allow_basic=True)
if left_type != right_type or BASIC in [left_type, right_type]:
if self.strategy == LEFT:
return self.left
elif self.strategy == RIGHT:
return self.right
elif self.strategy == DIFF:
if os.getenv("VERBOSE"):
return {
"left": self.left,
"right": self.right,
"same": self.left != self.right,
}
if self.left != self.right:
return {"left": self.left, "right": self.right}
else:
return self.left
elif self.strategy == MERGE:
if self.left != self.right:
return {"left": self.left, "right": self.right}
else:
return self.left
return NOT_SET
else:
t = self.get_type(self.left)
if t == LIST:
self.result = []
elif t == DICT:
self.result = {}
self._loop_on_data()
return self.result
def read_file(path, is_json=False):
path = os.path.expanduser(path)
path = os.path.expandvars(path)
if os.path.isfile(path):
with open(path) as f:
rv = f.read()
if is_json:
return json.loads(rv)
return rv
else:
print("warning: file {} does not exist".format(path))
return None
def print_json(data):
def json_serializer(obj):
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
return str(obj)
# raise TypeError("Type %s not serializable" % type(obj))
print(
json.dumps(
to_ordered_dict(data), ensure_ascii=False, indent=2, default=json_serializer
)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Compare, diff, merge two json data")
parser.add_argument("left", help="json file path as left data")
parser.add_argument("right", help="json file path as left data")
parser.add_argument(
"-s",
"--strategy",
default=DIFF,
help="""the strategy to produce result data.
LEFT - only left have;
RIGHT - only right have;
DIFF - only different bettween left and right;
MERGE - merge into one, conflicts left unmerged.
""",
choices=strategies,
)
parser.add_argument(
"--verbose", action="store_true", help="output detail when use DIFF strategy"
)
args = parser.parse_args()
if args.verbose:
os.environ["VERBOSE"] = "1"
left = read_file(args.left, is_json=True)
right = read_file(args.right, is_json=True)
if left and right:
result = DictDiffer(left, right, args.strategy).compare()
print_json(result)
else:
sys.exit(1)
$ cat a.json b.json
{
  "a": {
    "bb": [
      1,
      3,
      4
    ],
    "cc": {
      "a": 11,
      "b": 2,
      "c": 3,
      "d": 4
    }
  },
  "common": 0,
  "common2": 0,
  "extra1": 3
}
{
  "a": {
    "bb": [
      1,
      2,
      4
    ],
    "cc": {
      "a": 1,
      "b": 2,
      "e": 4,
      "f": 3
    }
  },
  "common": 0,
  "common2": 1,
  "extra2": 3
}
$ ./dict-compare.py a.json b.json -s LEFT
{
  "a": {
    "bb": [
      1,
      3,
      4
    ],
    "cc": {
      "a": 11,
      "b": 2,
      "c": 3,
      "d": 4
    }
  },
  "common": 0,
  "common2": 0,
  "extra1": 3
}
$ ./dict-compare.py a.json b.json -s RIGHT
{
  "a": {
    "bb": [
      1,
      2,
      4
    ],
    "cc": {
      "a": 1,
      "b": 2,
      "e": 4,
      "f": 3
    }
  },
  "common": 0,
  "common2": 1,
  "extra2": 3
}
$ ./dict-compare.py a.json b.json  # -s DIFF
{
  "extra2": 3,
  "extra1": 3,
  "common2": {
    "left": 0,
    "right": 1
  },
  "a": {
    "cc": {
      "e": 4,
      "f": 3,
      "c": 3,
      "d": 4,
      "a": {
        "left": 11,
        "right": 1
      },
      "b": 2
    },
    "bb": [
      1,
      {
        "left": 3,
        "right": 2
      },
      4
    ]
  },
  "common": 0
}
$ ./dict-compare.py a.json b.json --verbose  # -s DIFF
{
  "extra2": 3,
  "extra1": 3,
  "common2": {
    "left": 0,
    "right": 1,
    "same": true
  },
  "common": {
    "left": 0,
    "right": 0,
    "same": false
  },
  "a": {
    "cc": {
      "e": 4,
      "f": 3,
      "c": 3,
      "d": 4,
      "b": {
        "left": 2,
        "right": 2,
        "same": false
      },
      "a": {
        "left": 11,
        "right": 1,
        "same": true
      }
    },
    "bb": [
      {
        "left": 1,
        "right": 1,
        "same": false
      },
      {
        "left": 3,
        "right": 2,
        "same": true
      },
      {
        "left": 4,
        "right": 4,
        "same": false
      }
    ]
  }
}
$ ./dict-compare.py a.json b.json -s MERGE
{
  "extra2": 3,
  "extra1": 3,
  "common2": {
    "left": 0,
    "right": 1
  },
  "common": 0,
  "a": {
    "bb": [
      1,
      {
        "left": 3,
        "right": 2
      },
      4
    ],
    "cc": {
      "e": 4,
      "f": 3,
      "c": 3,
      "d": 4,
      "b": 2,
      "a": {
        "left": 11,
        "right": 1
      }
    }
  }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment