Skip to content

Instantly share code, notes, and snippets.

@georgevreilly
Last active February 15, 2018 19:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save georgevreilly/3c25f345ac099644e088537c9fc08cdd to your computer and use it in GitHub Desktop.
Save georgevreilly/3c25f345ac099644e088537c9fc08cdd to your computer and use it in GitHub Desktop.
Convert large JSON blob into JSONLines. Handles corrupt records. The blob must be a JSON array, containing flat (non-nested) objects.
#!/usr/bin/env python
from __future__ import unicode_literals, absolute_import, print_function
import argparse
import codecs
from collections import OrderedDict
import json
import os
import re
import six
import sys
try:
import ijson
except ImportError:
ijson = None
def parse_args(argv=None):
parser = argparse.ArgumentParser(
description="Convert large JSON blob into JSONLines. Handles corrupt records.")
parser.set_defaults(
input='-',
output='-',
item_reader=blob_to_items,
)
parser.add_argument(
"input", nargs="?",
help="Where to read the JSON blob. '-' reads from stdin (default). "
"The blob must be a JSON array, containing flat (non-nested) objects.")
parser.add_argument(
"--output", "-o",
help="Where to write the JSONLines. '-' sends to stdout (default). "
"A '.gz' or '.bz2' extension compresses.")
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--stream", dest='item_reader',
action='store_const', const=blob_to_items,
help="Use a streaming JSON parser to read JSON blob. (default)")
group.add_argument(
"--hack", dest='item_reader',
action='store_const', const=hack_to_lines,
help="Split `},{` at comma. Doesn't work with nested objects.")
namespace = parser.parse_args(argv)
return namespace
BUFSIZE = 16 * 1024
MAP_SEPARATOR_RE = re.compile(r'},\s*{"')
FINAL_MAP_ITEM_RE = re.compile(r'^.*}\s*]\s*$')
def hack_to_lines(fp, buf_size=BUFSIZE, **kwargs):
"""Hackishly convert JSON array into JSONLines by splitting `}, {` at the comma.
Expect a blob that looks like:
[{a1: b1, c1: d1, ...}, {a2: b2, ...}, ..., {aN: bN, ...}]
which will turn into:
{a1: b1, c1: d1, ...}
{a2: b2, ...}
...
{aN: bN, ...}
NB: the individual record objects must be flat; i.e., they must not contain nested objects.
"""
# Reset binary file reader object to be a UTF-8 reader
if type(fp.read(0)) == six.binary_type:
fp = codecs.getreader('utf-8')(fp)
x = fp.read(1)
if x != '[':
raise ValueError("Stream doesn't begin with '['")
buf = fp.read(buf_size)
item_count = 0
while True:
match = MAP_SEPARATOR_RE.search(buf)
if match:
item_text = buf[:match.start()+1] # include the `}`
try:
item = json.loads(item_text, object_pairs_hook=OrderedDict)
yield item
item_count += 1
except ValueError:
print("{}: Couldn't parse '{}'".format(item_count, item_text))
buf = buf[match.end() - 2:] # Start at next `{"`
else:
data = fp.read(buf_size)
if data:
buf += data
else:
match = FINAL_MAP_ITEM_RE.match(buf)
if match:
item_text = buf[:buf.rindex('}')+1]
try:
item = json.loads(item_text, object_pairs_hook=OrderedDict)
yield item
item_count += 1
except ValueError:
print("{}: Couldn't parse '{}'".format(item_count, item_text))
else:
print("Expected trailing '{0}' in exhausted stream. Got: '{1}'".format(
'}]', buf[-20:]))
break
def blob_to_items(fp, **kwargs):
"""Use ijson streaming parser to convert JSON array into JSONLines.
Expect a blob that looks like:
[{a1: b1, c1: d1, ...}, {a2: b2, ...}, ..., {aN: bN, ...}]
which will turn into:
{a1: b1, c1: d1, ...}
{a2: b2, ...}
...
{aN: bN, ...}
NB: the individual record objects must be flat; i.e., they must not contain nested objects.
"""
ijp = ijson.parse(fp)
prefix, event, value = next(ijp)
if (prefix, event, value) != ('', 'start_array', None):
raise ValueError("Stream doesn't begin with '['")
item_count = 0
while True:
prefix, event, value = next(ijp)
if (prefix, event, value) == ('', 'end_array', None):
print("{} items read".format(item_count))
break
if (prefix, event, value) != ('item', 'start_map', None):
raise ValueError("Expected item starting with '{'")
item = OrderedDict()
while (prefix, event, value) != ('item', 'end_map', None):
try:
prefix, event, value = next(ijp)
except ijson.JSONError:
print(item_count, item)
continue
if (prefix, event) == ('item', 'map_key'):
map_key = value
prefix, event, value = next(ijp)
if prefix == "item." + map_key:
if event in {"string", "number", "boolean", "null"}:
item[map_key] = value
elif event == "start_map":
raise ValueError("Don't know how to handle nested object")
elif event == "start_array":
raise ValueError("Don't know how to handle nested array")
else:
raise ValueError("Don't know how to handle {}={}".format(event, value))
else:
raise ValueError("Expected event={}; got {}".format("item." + map_key, event))
yield item
item_count += 1
def file_open(filename, mode='r', **kwargs):
if filename == '-':
return sys.stdin if mode.startswith('r') else sys.stdout
else:
filename = os.path.abspath(os.path.expanduser(filename))
if filename.endswith('.gz'):
import gzip
return gzip.GzipFile(filename, mode=mode, **kwargs)
elif filename.endswith('.bz2'):
import bz2
return bz2.BZ2File(filename, mode=mode, **kwargs)
elif filename.endswith('.xz'):
import lzma
return lzma.LZMAFile(filename, mode=mode, **kwargs)
else:
return open(filename, mode=mode, **kwargs)
def file_close(fp):
if fp and fp not in {sys.stdin, sys.stdout, sys.stderr}:
fp.close()
def main(argv=None):
namespace = parse_args(argv)
input_fp = output_fp = None
try:
input_fp = file_open(namespace.input, "rb")
output_fp = file_open(namespace.output, "wb")
for item in namespace.item_reader(input_fp):
line = json.dumps(item, separators=(',', ':'))
output_fp.write(line)
output_fp.write(b'\n')
finally:
file_close(input_fp)
file_close(output_fp)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment