Last active
February 15, 2018 19:59
-
-
Save georgevreilly/3c25f345ac099644e088537c9fc08cdd to your computer and use it in GitHub Desktop.
Convert large JSON blob into JSONLines. Handles corrupt records. The blob must be a JSON array, containing flat (non-nested) objects.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import unicode_literals, absolute_import, print_function | |
import argparse | |
import codecs | |
from collections import OrderedDict | |
import json | |
import os | |
import re | |
import six | |
import sys | |
try: | |
import ijson | |
except ImportError: | |
ijson = None | |
def parse_args(argv=None): | |
parser = argparse.ArgumentParser( | |
description="Convert large JSON blob into JSONLines. Handles corrupt records.") | |
parser.set_defaults( | |
input='-', | |
output='-', | |
item_reader=blob_to_items, | |
) | |
parser.add_argument( | |
"input", nargs="?", | |
help="Where to read the JSON blob. '-' reads from stdin (default). " | |
"The blob must be a JSON array, containing flat (non-nested) objects.") | |
parser.add_argument( | |
"--output", "-o", | |
help="Where to write the JSONLines. '-' sends to stdout (default). " | |
"A '.gz' or '.bz2' extension compresses.") | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument( | |
"--stream", dest='item_reader', | |
action='store_const', const=blob_to_items, | |
help="Use a streaming JSON parser to read JSON blob. (default)") | |
group.add_argument( | |
"--hack", dest='item_reader', | |
action='store_const', const=hack_to_lines, | |
help="Split `},{` at comma. Doesn't work with nested objects.") | |
namespace = parser.parse_args(argv) | |
return namespace | |
BUFSIZE = 16 * 1024 | |
MAP_SEPARATOR_RE = re.compile(r'},\s*{"') | |
FINAL_MAP_ITEM_RE = re.compile(r'^.*}\s*]\s*$') | |
def hack_to_lines(fp, buf_size=BUFSIZE, **kwargs): | |
"""Hackishly convert JSON array into JSONLines by splitting `}, {` at the comma. | |
Expect a blob that looks like: | |
[{a1: b1, c1: d1, ...}, {a2: b2, ...}, ..., {aN: bN, ...}] | |
which will turn into: | |
{a1: b1, c1: d1, ...} | |
{a2: b2, ...} | |
... | |
{aN: bN, ...} | |
NB: the individual record objects must be flat; i.e., they must not contain nested objects. | |
""" | |
# Reset binary file reader object to be a UTF-8 reader | |
if type(fp.read(0)) == six.binary_type: | |
fp = codecs.getreader('utf-8')(fp) | |
x = fp.read(1) | |
if x != '[': | |
raise ValueError("Stream doesn't begin with '['") | |
buf = fp.read(buf_size) | |
item_count = 0 | |
while True: | |
match = MAP_SEPARATOR_RE.search(buf) | |
if match: | |
item_text = buf[:match.start()+1] # include the `}` | |
try: | |
item = json.loads(item_text, object_pairs_hook=OrderedDict) | |
yield item | |
item_count += 1 | |
except ValueError: | |
print("{}: Couldn't parse '{}'".format(item_count, item_text)) | |
buf = buf[match.end() - 2:] # Start at next `{"` | |
else: | |
data = fp.read(buf_size) | |
if data: | |
buf += data | |
else: | |
match = FINAL_MAP_ITEM_RE.match(buf) | |
if match: | |
item_text = buf[:buf.rindex('}')+1] | |
try: | |
item = json.loads(item_text, object_pairs_hook=OrderedDict) | |
yield item | |
item_count += 1 | |
except ValueError: | |
print("{}: Couldn't parse '{}'".format(item_count, item_text)) | |
else: | |
print("Expected trailing '{0}' in exhausted stream. Got: '{1}'".format( | |
'}]', buf[-20:])) | |
break | |
def blob_to_items(fp, **kwargs): | |
"""Use ijson streaming parser to convert JSON array into JSONLines. | |
Expect a blob that looks like: | |
[{a1: b1, c1: d1, ...}, {a2: b2, ...}, ..., {aN: bN, ...}] | |
which will turn into: | |
{a1: b1, c1: d1, ...} | |
{a2: b2, ...} | |
... | |
{aN: bN, ...} | |
NB: the individual record objects must be flat; i.e., they must not contain nested objects. | |
""" | |
ijp = ijson.parse(fp) | |
prefix, event, value = next(ijp) | |
if (prefix, event, value) != ('', 'start_array', None): | |
raise ValueError("Stream doesn't begin with '['") | |
item_count = 0 | |
while True: | |
prefix, event, value = next(ijp) | |
if (prefix, event, value) == ('', 'end_array', None): | |
print("{} items read".format(item_count)) | |
break | |
if (prefix, event, value) != ('item', 'start_map', None): | |
raise ValueError("Expected item starting with '{'") | |
item = OrderedDict() | |
while (prefix, event, value) != ('item', 'end_map', None): | |
try: | |
prefix, event, value = next(ijp) | |
except ijson.JSONError: | |
print(item_count, item) | |
continue | |
if (prefix, event) == ('item', 'map_key'): | |
map_key = value | |
prefix, event, value = next(ijp) | |
if prefix == "item." + map_key: | |
if event in {"string", "number", "boolean", "null"}: | |
item[map_key] = value | |
elif event == "start_map": | |
raise ValueError("Don't know how to handle nested object") | |
elif event == "start_array": | |
raise ValueError("Don't know how to handle nested array") | |
else: | |
raise ValueError("Don't know how to handle {}={}".format(event, value)) | |
else: | |
raise ValueError("Expected event={}; got {}".format("item." + map_key, event)) | |
yield item | |
item_count += 1 | |
def file_open(filename, mode='r', **kwargs): | |
if filename == '-': | |
return sys.stdin if mode.startswith('r') else sys.stdout | |
else: | |
filename = os.path.abspath(os.path.expanduser(filename)) | |
if filename.endswith('.gz'): | |
import gzip | |
return gzip.GzipFile(filename, mode=mode, **kwargs) | |
elif filename.endswith('.bz2'): | |
import bz2 | |
return bz2.BZ2File(filename, mode=mode, **kwargs) | |
elif filename.endswith('.xz'): | |
import lzma | |
return lzma.LZMAFile(filename, mode=mode, **kwargs) | |
else: | |
return open(filename, mode=mode, **kwargs) | |
def file_close(fp): | |
if fp and fp not in {sys.stdin, sys.stdout, sys.stderr}: | |
fp.close() | |
def main(argv=None): | |
namespace = parse_args(argv) | |
input_fp = output_fp = None | |
try: | |
input_fp = file_open(namespace.input, "rb") | |
output_fp = file_open(namespace.output, "wb") | |
for item in namespace.item_reader(input_fp): | |
line = json.dumps(item, separators=(',', ':')) | |
output_fp.write(line) | |
output_fp.write(b'\n') | |
finally: | |
file_close(input_fp) | |
file_close(output_fp) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment