Skip to content

Instantly share code, notes, and snippets.

@outofmbufs
Last active October 18, 2021 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save outofmbufs/bcf3bccc1fe0bb0824871ec5e02cc60e to your computer and use it in GitHub Desktop.
Save outofmbufs/bcf3bccc1fe0bb0824871ec5e02cc60e to your computer and use it in GitHub Desktop.
python function to decode multiple JSON representations in a single file (stream)
# The MIT License
#
# Copyright (c) 2019 Neil Webber
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import json
#
# A MultiDecoder is a subclass of json.JSONDecoder and adds:
#
# sizelimit - Object __init__() parameter. If set, protects against an
# "unbounded stream of unresolved JSON/garbage" attack by capping
# the max size for buffered data. Default is no limit.
#
# [exception] SkipAllError - tells loadmultijson the string contained only
# ignorable characters (e.g., whitespace), so it can eat them rather
# than report them as a garbage tuple if they are trailing data.
#
class MultiDecoder(json.JSONDecoder):
def __init__(self, *, sizelimit=0):
self.__sizelimit = sizelimit
super().__init__()
def raw_decode(self, s):
if self.__sizelimit > 0 and len(s) > self.__sizelimit:
raise ValueError(f"JSON string exceeds limit {self.__sizelimit}")
stripped = s.lstrip() # because raw_decode doesn't allow
if len(stripped) == 0:
raise SkipAllError()
obj, n = super().raw_decode(stripped)
n += len(s) - len(stripped) # adjust to count the stripped chars
return obj, n
# raised when the entire string should be discard (e.g., is all whitespace)
class SkipAllError(Exception):
pass
# loadmultijson is a generator that yields individual JSON objects from a
# stream containing multiple such strings, yielding a tuple (obj, str) as
# each JSON representation is parsed.
#
# For example if the input stream is:
# [1, 2, 3] {"k1": 42} 17
#
# three tuples will be generated:
# ([1, 2, 3], '[1, 2, 3]')
# ({'k1': 42}, '{"k1": 42}')
# (17, '17')
#
# If there is unparseable JSON in the stream it will be returned as the
# last tuple, as a 'garbage tuple' distinguished by the obj (first element)
# being None. For example:
#
# [1, 2, 3] ooops
#
# will yield:
# ([1, 2, 3], '[1, 2, 3]')
# (None, 'ooops')
#
# Be aware that any JSON syntax error in the stream will cause the entire
# rest of the stream to be consumed and an enormous garbage tuple to be
# returned, as the algorithm just tacks on more and more data so long as
# the current buffer will not parse. Consider for example a stream containing
# three JSON representations (the middle one being illegal):
# [1, 2, 3] {"bogus"} [ ...<enormous array here>... ]
# This will return one valid tuple, then one enormous garbage tuple containing
# the entire rest of the stream. There is no attempt to try to discern where
# an invalid JSON object "looks like it ends". The MultiDecoder (the
# default decoder) can optionally enforce a sizelimit to (crudely) control
# this. An ambitious custom decoder could try to recover the parse stream,
# possibly knowing something about the underlying application.
#
#
# REQUIRED POSITIONAL ARGUMENTS:
#
# readable - anything that has an io style .read() method
#
# OPTIONAL KEYWORD ARGUMENTS:
# decoder - Nominally a JSON decoder object but the only required
# method is raw_decode. By default it is a MultiDecoder.
#
# NOTE: The decoder can raise SkipAllError to communicate that the
# entire string it was given is ignorable characters. This
# allows a decoder to cause trailing ignorable characters in
# a stream (e.g., trailing whitespace) to be eaten rather than
# instead being ultimately returned as a trailing garbage tuple
def loadmultijson(readable, *, decoder=MultiDecoder()):
# read() chunks in this size; arbitrary and ANY VALUE WORKS though
# obviously there is a weak time/space tradeoff lurking in this choice.
incrsize = 10000
chunk = ""
while len(newdata := readable.read(incrsize)) > 0:
chunk += newdata
while len(chunk) > 0:
try:
obj, n = decoder.raw_decode(chunk)
except SkipAllError:
chunk = ""
except json.JSONDecodeError:
break # no complete json yet; append more data
else:
yield obj, chunk[:n]
chunk = chunk[n:]
# any leftover data is trailing garbage. Caller has to look for
# this as a None JSON object in the tuple
if len(chunk) > 0:
yield None, chunk
# TESTS
if __name__ == '__main__':
import unittest
import io
import random
class MJTestMethods(unittest.TestCase):
def test_whitespace(self):
teststring = " [1, 2, 3] \n "
f = io.StringIO(teststring)
tuples = list(loadmultijson(f))
self.assertEqual(len(tuples), 1)
obj, s = tuples[0]
self.assertEqual(obj, [1, 2, 3])
self.assertEqual(f.read(), '') # should be at EOF
def test_limit(self):
teststring = "[1, 2, 3]"
f = io.StringIO(teststring)
decoder = MultiDecoder(sizelimit=len(teststring))
tuples = list(loadmultijson(f, decoder=decoder))
self.assertEqual(len(tuples), 1)
obj, s = tuples[0]
self.assertEqual(obj, [1, 2, 3])
self.assertEqual(f.read(), '') # should be at EOF
f = io.StringIO(teststring)
decoder = MultiDecoder(sizelimit=len(teststring)-1)
with self.assertRaises(ValueError):
tuples = list(loadmultijson(f, decoder=decoder))
self.assertEqual(f.read(), '') # should be at EOF
def test_big(self):
# this sort of assumes incrsize is lower than this
biggerthanincr = 123456
teststring = " " * biggerthanincr
teststring += "[1, 2, 3]"
teststring += " " * biggerthanincr
f = io.StringIO(teststring)
tuples = list(loadmultijson(f))
self.assertEqual(len(tuples), 1)
obj, s = tuples[0]
self.assertEqual(obj, [1, 2, 3])
self.assertEqual(f.read(), '')
def test_random(self):
# the goal here is to try to hit various boundary cases
# XXX really should do an exhaustive analysis instead of random
for iteration in range(200):
ntuples = random.randint(10, 100)
teststring = "\n" * random.randint(1000, 20000)
for t in range(ntuples):
teststring += f"[{t}]"
teststring += " " * random.randint(1000, 20000)
f = io.StringIO(teststring)
tn = 0
for obj, s in loadmultijson(f):
self.assertEqual(obj, [tn])
tn += 1
self.assertEqual(tn, ntuples)
self.assertEqual(f.read(), '')
def test_tight(self):
ntuples = 1000
teststring = ""
for t in range(ntuples):
teststring += json.dumps([t]*(t+1))
f = io.StringIO(teststring)
tn = 0
for obj, s in loadmultijson(f):
self.assertEqual(obj, [tn]*(tn+1))
tn += 1
self.assertEqual(tn, ntuples)
self.assertEqual(f.read(), '')
def test_garbage(self):
for bogon in ('[1, 2, 3]{"this is bogus"}',
' [1, 2, 3] {"this is still bogus"} ',
' [1, 2, 3] {"k1": {"this is nestedly bogus"}}'):
f = io.StringIO(bogon)
tuples = list(loadmultijson(f))
self.assertEqual(len(tuples), 2)
obj, s = tuples[0]
self.assertEqual(obj, [1, 2, 3])
self.assertEqual(f.read(), '')
# garbage tuple
obj, s = tuples[1]
self.assertIsNone(obj)
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment