Skip to content

Instantly share code, notes, and snippets.

@niccokunzmann
Last active October 10, 2023 18:17
Show Gist options
  • Save niccokunzmann/10015194 to your computer and use it in GitHub Desktop.
Save niccokunzmann/10015194 to your computer and use it in GitHub Desktop.
Decode large json files
"""
Response to
http://stackoverflow.com/a/22904200/1320237
"""
import json.scanner
import json.decoder
from json.decoder import JSONDecoder
class FileString(object):
def __init__(self, file, start = 0):
self._file = file
self._start = start
def seek(self, index):
self._file.seek(index + self._start)
def __getitem__(self, item):
if isinstance(item, int):
self.seek(item)
no = 1
else:
self.seek(item.start)
assert not item.step or item.step == 1
no = item.stop - item.start
return self._file.read(1)
def __len__(self):
self._file.seek(0, 2)
return self._file.tell() - self._start
class WHITESPACE_match(object):
# WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
# WHITESPACE.match
def __init__(self, string, end):
self._string = string
while self._string[end].isspace():
end += 1
self._end = end
def end(self):
return self._end
import re
class STRINGCHUNK_match(object):
# WHITESPACE = re.compile(r'[ \t\n\r]*', FLAGS)
# WHITESPACE.match
def __init__(self, string, end):
self._string = string
self._content = ""
while 1:
c = string[end]
end += 1
if re.match('["\\\x00-\x1f]', c):
break
self._content += c
self._terminator = c
self._end = end
def end(self):
return self._end
def groups(self):
return self._content, self._terminator
_NUMBER_RE = re.compile(
r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?',
(re.VERBOSE | re.MULTILINE | re.DOTALL))
class NUMBER_RE:
@staticmethod
def match(s, index):
s = s[index: index + 100]
return _NUMBER_RE.match(s, index)
json.scanner.NUMBER_RE = NUMBER_RE
class BigJSONDecoder(JSONDecoder):
def decode(self, s, _w=WHITESPACE_match):
"""Return the Python representation of ``s`` (a ``str`` instance
containing a JSON document).
"""
self.parse_object = lambda *args, **kw: json.decoder.JSONObject(_w = WHITESPACE_match, *args, **kw)
self.scan_once = json.scanner.make_scanner(self)
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
end = _w(s, end).end()
if end != len(s):
return obj, end
return obj, None
json.scanner.make_scanner = json.scanner.py_make_scanner
##scanstring = c_scanstring or py_scanstring
json.decoder.scanstring = lambda s, end, strict=True, _b=json.decoder.BACKSLASH: \
json.decoder.py_scanstring(s, end, strict=strict, _b=_b, _m=STRINGCHUNK_match)
if __name__ == '__main__':
import io
import json
s = io.StringIO()
json.dump({1:[]}, s)
json.dump({2:"hallo"}, s)
print(repr(s.getvalue()))
s.seek(0)
o1, idx1 = json.loads(FileString(s), cls = BigJSONDecoder)
print(o1)
o2, idx2 = json.loads(FileString(s, idx1), cls = BigJSONDecoder)
print(o2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment