Skip to content

Instantly share code, notes, and snippets.

@bencharb
Last active February 7, 2016 08:42
Show Gist options
  • Save bencharb/2bb6509bac93a5e1f28d to your computer and use it in GitHub Desktop.
Save bencharb/2bb6509bac93a5e1f28d to your computer and use it in GitHub Desktop.
Merge and sort dict collections and json files
import collections
import operator
import json
def merge_dicts(dicts=None, create_key_func=None):
""" Efficiently sort and merge different dictionary collections """
sort_keys = collections.defaultdict(list)
for dct_ix, dcts in enumerate(dicts):
for record_ix, record in enumerate(dcts):
sortkey = create_key_func(record)
sort_keys[sortkey].append((dct_ix, record_ix,))
for k in sorted(sort_keys):
index_dicts = sort_keys[k]
if not index_dicts:
continue
for dct_ix, record_ix in sorted(index_dicts, key=operator.itemgetter(0)):
yield dicts[dct_ix][record_ix]
class JsonPathFileOpenerList(list):
def get_record(self, path):
with open(path, 'r') as fin:
return json.loads(fin.read())
def __getitem__(self, ix):
path = super(JsonPathFileOpenerList, self).__getitem__(ix)
with open(path, 'r') as fin:
data = json.loads(fin.read())
return data
def __iter__(self):
vals = super(JsonPathFileOpenerList, self).__iter__()
for path in vals:
yield self.get_record(path)
def merge_json_files(files=None, target=None, sort_func=None, batch_write_size=10):
list_of_dicts = JsonPathFileOpenerList(files)
with open(target, 'wa') as fout:
dcts = merge_dicts(dicts=list_of_dicts, create_key_func=sort_func)
batch = 0
has_data = True
while has_data:
for x in xrange(batch_write_size):
dict_out = []
for dct in dcts:
dict_out.append(dct)
if not dict_out:
has_data = False
break
fout.write(json.dumps(dict_out))
batch+=1
if batch > 20:
break
return target
def test_merge_dicts():
expected = [{'val': 'a'},
{'val': 'b'},
{'val': 'c'},
{'val': 'd'},
{'val': 'e'},
{'val': 'f'},
{'val': 'g'},
{'val': 'h'},
{'val': 'i'},
{'val': 'j'},
{'val': 'k'},
{'val': 'l'}]
alph = 'a b c d e f g h i j k'.split()
nums = [i for i in xrange(len(alph))]
alphandnums = ['%s%s' % (c,c,) for c in zip(alph,nums)]
alph1 = 'a c e g i k'.split()
alph2 = 'b d f h j l'.split()
dct1 = [{'val':v} for v in alph1]
dct2 = [{'val':v} for v in alph2]
key_func = lambda r: r['val']
merged = merge_dicts(dicts=[dct1,dct2], create_key_func=lambda r: r['val'])
merged = list(merged)
assert merged == expected
def test_merge_json_files():
alph1 = 'a c e g i k'.split()
alph2 = 'b d f h j l'.split()
dct1 = [{'val':v} for v in alph1]
dct2 = [{'val':v} for v in alph2]
key_func = lambda r: r['val']
f1 = '/tmp/fdct1.json'
f2 = '/tmp/fdct2.json'
target = '/tmp/fdctmerge1.json'
expected_dict = [{"val": "a"}, {"val": "b"}, {"val": "c"}, {"val": "d"},
{"val": "e"}, {"val": "f"}, {"val": "g"}, {"val": "h"},
{"val": "i"}, {"val": "j"}, {"val": "k"}, {"val": "l"}]
expected_json = json.dumps(expected_dict)
# Write files
with open(f1, 'w') as fout:
fout.write(json.dumps(dct1))
with open(f2, 'w') as fout:
fout.write(json.dumps(dct2))
#merge them
merge_json_files(files=[f1,f2], target=target, sort_func=key_func)
#test content
with open(target, 'r') as fin:
assert fin.read() == expected_json
#clean files
import os
files = f1, f2, target
for f in files:
os.remove(f)
## Uncomment to test
# test_merge_dicts()
# test_merge_json_files()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment