Created
December 28, 2022 13:08
-
-
Save adonig/643954717bb937aed8722fedf9a5c4dc to your computer and use it in GitHub Desktop.
Repair invalid JSON documents in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# jsonrepair.py - Repair invalid JSON documents in Python | |
# | |
# Just https://github.com/josdejong/jsonrepair ported from TypeScript to Python. | |
# | |
# This port won't get updates, because the goal should be to generate this library instead. | |
# | |
# See: https://github.com/josdejong/jsonrepair/issues/84 | |
# | |
import json | |
import re | |
from typing import Optional | |
CONTROL_CHARACTERS = {"\b": "\\b", "\f": "\\f", "\n": "\\n", "\r": "\\r", "\t": "\\t"} | |
ESCAPE_CHARACTERS = { | |
'"': '"', | |
"\\": "\\", | |
"/": "/", | |
"b": "\b", | |
"f": "\f", | |
"n": "\n", | |
"r": "\r", | |
"t": "\t" | |
# note that \u is handled separately in parseString() | |
} | |
def remove_at_index(text: str, start: int, count: int) -> str: | |
return text[0:start] + text[start + count :] | |
def is_control_character(char: str) -> bool: | |
return char in CONTROL_CHARACTERS | |
def is_valid_string_character(char: str) -> bool: | |
return 0x20 <= ord(char) <= 0x10FFFF | |
def is_quote(char: str) -> bool: | |
return is_single_quote(char) or is_double_quote(char) | |
def is_single_quote(char: str) -> bool: | |
"""Test whether the given character is a single quote character. | |
Also tests for special variants of single quotes. | |
""" | |
return char in ( | |
"'", # U+0027 | |
"‘", # U+2018 | |
"’", # U+2019 | |
"`", # U+0060 | |
"´", # U+00B4 | |
) | |
def is_double_quote(char: str) -> bool: | |
return is_ascii_double_quote(char) or is_double_quote_left(char) or is_double_quote_right(char) | |
def is_ascii_double_quote(char: str) -> bool: | |
return char == '"' # U+0022 | |
def is_double_quote_left(char: str) -> bool: | |
return char == "“" # U+201C | |
def is_double_quote_right(char: str) -> bool: | |
return char == "”" # U+201D | |
def is_start_of_value(char: str) -> bool: | |
regex_start_of_value = r"^[[{\w-]$" # alpha, number, minus, or opening bracket or brace | |
return bool(re.search(regex_start_of_value, char)) or is_quote(char) | |
def ends_with_comma_or_newline(text: str) -> bool: | |
return bool(re.search(r"[,\n][ \t\r]*$", text)) | |
def is_whitespace(char: str) -> bool: | |
return char.isspace() | |
def is_special_whitespace(char: str) -> bool: | |
"""Check if the given character is a special whitespace character, some unicode variant""" | |
return ( | |
char == "\u00A0" # non-breaking space | |
or ord("\u2000") <= ord(char) <= ord("\u200A") | |
or char == "\u202F" | |
or char == "\u205F" | |
or char == "\u3000" | |
) | |
def insert_before_last_whitespace(text: str, text_to_insert: str) -> str: | |
index = len(text) | |
if not is_whitespace(text[index - 1]): | |
# no trailing whitespaces | |
return text + text_to_insert | |
while is_whitespace(text[index - 1]): | |
index -= 1 | |
return text[:index] + text_to_insert + text[index:] | |
def strip_last_occurrence(text: str, text_to_strip: str, strip_remaining: bool = False) -> str: | |
index = text.rindex(text_to_strip) | |
try: | |
return text[:index] + ("" if strip_remaining else text[index + 1 :]) | |
except ValueError: | |
return text | |
def is_hex(char: str) -> bool: | |
try: | |
int(char, 16) | |
return True | |
except ValueError: | |
return False | |
def is_delimiter(char: str) -> bool: | |
return char in ",:[]{}()\n'" or is_quote(char) | |
def at_end_of_block_comment(text: str, i: int) -> bool: | |
return text[i] == "*" and text[i + 1] == "/" | |
class JsonRepairError(Exception): | |
def __init__(self, message: str, position: int): | |
super(JsonRepairError, self).__init__(message + f" at position {position}") | |
self.position = position | |
class JsonRepair: | |
"""Repairs invalid JSON, i.e. change JavaScript notation into JSON notation. | |
Example: | |
try: | |
json = "{name: 'John'}" | |
repaired = JsonRepair(json).repair() | |
print(repaired) | |
# '{"name": "John"}' | |
except JsonRepairFailed as err: | |
print(err) | |
""" | |
def __init__(self, text: str): | |
self.text = text | |
self.i = 0 # current index in text | |
self.output = "" # generated output | |
def char(self, pos: int = 0) -> str: | |
return self.text[self.i + pos] | |
def inc(self, by: int = 1) -> None: | |
self.i += by | |
def dec(self, by: int = 1) -> None: | |
self.i -= by | |
def is_start_of_document(self, pos: int = 0) -> bool: | |
return self.i + pos == 0 | |
def is_end_of_document(self, pos: int = 0) -> bool: | |
return self.i + pos >= len(self.text) | |
def repair(self) -> str: | |
processed = self.parse_value() | |
if not processed: | |
raise self.unexpected_end() | |
processed_comma = self.parse_character(",") | |
if processed_comma: | |
self.parse_whitespace_and_skip_comments() | |
if not self.is_end_of_document() and is_start_of_value(self.char()) and ends_with_comma_or_newline(self.output): | |
# start of a new value after end of the root level object: looks like | |
# newline delimited JSON -> turn into a root level array | |
if not processed_comma: | |
# repair missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
self.parse_newline_delimited_json() | |
elif processed_comma: | |
# repair: remove trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
if self.is_end_of_document(): | |
# reached the end of the document properly | |
return self.output | |
raise self.unexpected_character() | |
def parse_value(self) -> bool: | |
self.parse_whitespace_and_skip_comments() | |
processed = ( | |
self.parse_object() | |
or self.parse_array() | |
or self.parse_string() | |
or self.parse_number() | |
or self.parse_keywords() | |
or self.parse_unquoted_string() | |
) | |
self.parse_whitespace_and_skip_comments() | |
return processed | |
def parse_whitespace_and_skip_comments(self) -> bool: | |
start = self.i | |
changed = self.parse_whitespace() | |
while True: | |
changed = self.parse_comment() | |
if changed: | |
changed = self.parse_whitespace() | |
if not changed: | |
break | |
return self.i > start | |
def parse_whitespace(self) -> bool: | |
whitespace = "" | |
while not self.is_end_of_document(): | |
char = self.char() | |
normal = is_whitespace(char) | |
special = is_special_whitespace(char) | |
if not normal and not special: | |
break | |
if special: | |
whitespace += " " # repair special whitespace | |
else: | |
whitespace += char | |
self.inc() | |
if whitespace: | |
self.output += whitespace | |
return True | |
return False | |
def parse_comment(self) -> bool: | |
# find a block comment '/* ... */' | |
if not self.is_end_of_document() and not self.is_end_of_document(pos=+1): | |
if self.char() == "/" and self.char(pos=+1) == "*": | |
# repair block comment by skipping it | |
while not self.is_end_of_document() and not at_end_of_block_comment(self.text, self.i): | |
self.inc() | |
self.inc(by=2) | |
return True | |
# find a line comment '// ...' | |
if self.char() == "/" and self.char(pos=+1) == "/": | |
# repair line comment by skipping it | |
while not self.is_end_of_document() and self.char() != "\n": | |
self.inc() | |
return True | |
return False | |
def parse_character(self, char: str) -> bool: | |
if not self.is_end_of_document(): | |
if self.char() == char: | |
self.output += char | |
self.inc() | |
return True | |
return False | |
def skip_character(self, char: str) -> bool: | |
if not self.is_end_of_document() and self.char() == char: | |
self.inc() | |
return True | |
return False | |
def skip_escape_character(self) -> bool: | |
return self.skip_character("\\") | |
def parse_object(self) -> bool: | |
"""Parse an object like '{"key": "value"}'""" | |
if not self.is_end_of_document() and self.char() == "{": | |
self.output += "{" | |
self.inc() | |
self.parse_whitespace_and_skip_comments() | |
initial = True | |
while not self.is_end_of_document() and self.char() != "}": | |
if not initial: | |
processed_comma = self.parse_character(",") | |
if not processed_comma: | |
# repair missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
self.parse_whitespace_and_skip_comments() | |
else: | |
processed_comma = True | |
initial = False | |
processed_key = self.parse_string() or self.parse_unquoted_string() | |
if not processed_key: | |
if self.is_end_of_document() or self.char() in "{}[]": | |
# repair trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
break | |
raise self.object_key_expected() | |
self.parse_whitespace_and_skip_comments() | |
processed_colon = self.parse_character(":") | |
if not processed_colon: | |
if is_start_of_value(self.char()): | |
# repair missing colon | |
self.output = insert_before_last_whitespace(self.output, ":") | |
else: | |
raise self.colon_expected() | |
processed_value = self.parse_value() | |
if not processed_value: | |
if processed_colon: | |
raise self.object_value_expected() | |
raise self.colon_expected() | |
if not self.is_end_of_document() and self.char() == "}": | |
self.output += "}" | |
self.inc() | |
else: | |
# repair missing end bracket | |
self.output = insert_before_last_whitespace(self.output, "}") | |
return True | |
return False | |
def parse_array(self) -> bool: | |
"""Parse an array like '["item1", "item2", ...]'""" | |
if not self.is_end_of_document() and self.char() == "[": | |
self.output += "[" | |
self.inc() | |
self.parse_whitespace_and_skip_comments() | |
initial = True | |
while not self.is_end_of_document() and self.char() != "]": | |
if not initial: | |
processed_comma = self.parse_character(",") | |
if not processed_comma: | |
# repair missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
else: | |
initial = False | |
processed_value = self.parse_value() | |
if not processed_value: | |
# repair trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
break | |
if not self.is_end_of_document() and self.char() == "]": | |
self.output += "]" | |
self.inc() | |
else: | |
# repair missing closing array bracket | |
self.output = insert_before_last_whitespace(self.output, "]") | |
return True | |
return False | |
def parse_newline_delimited_json(self): | |
"""Parse and repair Newline Delimited JSON (NDJSON): | |
multiple JSON objects separated by a newline character | |
""" | |
# repair NDJSON | |
initial = True | |
processed_value = True | |
while processed_value: | |
if not initial: | |
# parse optional comma, insert when missing | |
processed_comma = self.parse_character(",") | |
if not processed_comma: | |
# repair: add missing comma | |
self.output = insert_before_last_whitespace(self.output, ",") | |
else: | |
initial = False | |
processed_value = self.parse_value() | |
if not processed_value: | |
# repair: remove trailing comma | |
self.output = strip_last_occurrence(self.output, ",") | |
# repair: wrap the output inside array brackets | |
self.output = f"[\n{self.output}\n]" | |
def parse_string(self) -> bool: | |
"""Parse a string enclosed by double quotes "...". Can contain escaped quotes | |
Repair strings enclosed in single quotes or special quotes | |
Repair an escaped string | |
""" | |
if not self.is_end_of_document(): | |
skip_escape_chars = self.char() == "\\" | |
if skip_escape_chars: | |
# repair: remove the first escape character | |
self.inc() | |
skip_escape_chars = True | |
if not self.is_end_of_document() and is_quote(self.char()): | |
is_end_quote = is_single_quote if is_single_quote(self.char()) else is_double_quote | |
if self.char() != '"': | |
pass # TODO?: repair non-normalized quote | |
self.output += '"' | |
self.inc() | |
while not self.is_end_of_document() and not is_end_quote(self.char()): | |
if self.char() == "\\": | |
char = self.char(pos=+1) | |
escape_char = ESCAPE_CHARACTERS.get(char) | |
if escape_char: | |
self.output += self.text[self.i : self.i + 2] | |
self.inc(by=2) | |
elif char == "u": | |
if ( | |
not self.is_end_of_document(pos=+5) | |
and is_hex(self.char(pos=+2)) | |
and is_hex(self.char(pos=+3)) | |
and is_hex(self.char(pos=+4)) | |
and is_hex(self.char(pos=+5)) | |
): | |
self.output += self.text[self.i : self.i + 6] | |
self.inc(by=6) | |
else: | |
raise self.invalid_unicode_character(self.i) | |
else: | |
# repair invalid escape character: remove it | |
self.output += char | |
self.inc(by=2) | |
else: | |
char = self.char() | |
if char == '"' and self.char(pos=-1) != "\\": | |
# repair unescaped double quote | |
self.output += "\\" + char | |
self.inc() | |
elif is_control_character(char): | |
# unescaped control character | |
self.output += CONTROL_CHARACTERS[char] | |
self.inc() | |
else: | |
if not is_valid_string_character(char): | |
raise self.invalid_character(char) | |
self.output += char | |
self.inc() | |
if skip_escape_chars: | |
processed = self.skip_escape_character() | |
if processed: | |
pass # repair: skipped escape character (nothing to do) | |
if not self.is_end_of_document() and is_quote(self.char()): | |
if self.char() != '"': | |
pass # TODO:? repair non-normalized quote | |
self.output += '"' | |
self.inc() | |
else: | |
# repair missing end quote | |
self.output += '"' | |
self.parse_concatenated_string() | |
return True | |
return False | |
def parse_concatenated_string(self) -> bool: | |
"""Repair concatenated strings like \"hello\" + \"world\", change this into \"helloworld\" """ | |
processed = False | |
self.parse_whitespace_and_skip_comments() | |
while not self.is_end_of_document() and self.char() == "+": | |
processed = True | |
self.inc() | |
self.parse_whitespace_and_skip_comments() | |
# repair: remove the end quote of the first string | |
self.output = strip_last_occurrence(self.output, '"', True) | |
start = len(self.output) | |
self.parse_string() | |
# repair: remove the start quote of the second string | |
self.output = remove_at_index(self.output, start, 1) | |
return processed | |
def parse_number(self) -> bool: | |
"""Parse a number like 2.4 or 2.4e6""" | |
if not self.is_end_of_document(): | |
start = self.i | |
if self.char() == "-": | |
self.inc() | |
err = self.expect_digit(start) | |
if err: | |
raise err | |
if not self.is_end_of_document() and self.char() == "0": | |
self.inc() | |
elif not self.is_end_of_document() and self.char() in "123456789": | |
self.inc() | |
while not self.is_end_of_document() and self.char().isdigit(): | |
self.inc() | |
if not self.is_end_of_document() and self.char() == ".": | |
self.inc() | |
err = self.expect_digit(start) | |
if err: | |
raise err | |
while not self.is_end_of_document() and self.char().isdigit(): | |
self.inc() | |
if not self.is_end_of_document() and self.char() in "eE": | |
self.inc() | |
if not self.is_end_of_document() and self.char() in "+-": | |
self.inc() | |
err = self.expect_digit(start) | |
if err: | |
raise err | |
while not self.is_end_of_document() and self.char().isdigit(): | |
self.inc() | |
if self.i > start: | |
self.output += self.text[start : self.i] | |
return True | |
return False | |
def parse_keywords(self) -> bool: | |
"""Parse keywords true, false, null | |
Repair Python keywords True, False, None | |
""" | |
return ( | |
self.parse_keyword("true", "true") | |
or self.parse_keyword("false", "false") | |
or self.parse_keyword("null", "null") | |
# repair Python keywords True, False, None | |
or self.parse_keyword("True", "true") | |
or self.parse_keyword("False", "false") | |
or self.parse_keyword("None", "null") | |
) | |
def parse_keyword(self, name: str, value: str) -> bool: | |
if self.text[self.i : self.i + len(name)] == name: | |
self.output += value | |
self.inc(by=len(name)) | |
return True | |
return False | |
def parse_unquoted_string(self) -> bool: | |
"""Repair and unquoted string by adding quotes around it | |
Repair a MongoDB function call like NumberLong("2") | |
Repair a JSONP function call like callback({...}); | |
""" | |
# note that the symbol can end with whitespaces: we stop at the next delimiter | |
start = self.i | |
while not self.is_end_of_document() and not is_delimiter(self.char()): | |
self.inc() | |
if self.i > start: | |
if not self.is_end_of_document() and self.char() == "(": | |
# repair a MongoDB function call like NumberLong("2") | |
# repair a JSONP function call like callback({...}); | |
self.inc() | |
self.parse_value() | |
if not self.is_end_of_document() and self.char() == ")": | |
# repair: skip close bracket of function call | |
self.inc() | |
if not self.is_end_of_document() and self.char() == ";": | |
# repair: skip semicolon after JSONP call | |
self.inc() | |
return True | |
# else repair unquoted string | |
# first, go back to prevent getting trailing whitespaces in the string | |
while not self.is_start_of_document() and is_whitespace(self.char(pos=-1)): | |
self.dec() | |
symbol = self.text[start : self.i] | |
self.output += json.dumps(symbol) | |
return True | |
return False | |
def expect_digit(self, start: int) -> Optional[JsonRepairError]: | |
if self.is_end_of_document() or not self.char().isdigit(): | |
num_so_far = self.text[start : self.i] | |
return JsonRepairError(f"Invalid number '{num_so_far}', expecting a digit {self.got()}", 2) | |
def invalid_character(self, char: str) -> JsonRepairError: | |
return JsonRepairError("Invalid character " + json.dumps(char), self.i) | |
def unexpected_character(self) -> JsonRepairError: | |
return JsonRepairError("Unexpected character " + json.dumps(self.text[self.i]), self.i) | |
def unexpected_end(self) -> JsonRepairError: | |
return JsonRepairError("Unexpected end of json string", len(self.text)) | |
def object_key_expected(self) -> JsonRepairError: | |
return JsonRepairError("Object key expected", self.i) | |
def object_value_expected(self) -> JsonRepairError: | |
return JsonRepairError("Object value expected", self.i) | |
def colon_expected(self) -> JsonRepairError: | |
return JsonRepairError("Colon expected", self.i) | |
def invalid_unicode_character(self, start: int) -> JsonRepairError: | |
end = start + 2 | |
while re.match(r"\w", self.text[end]): | |
end += 1 | |
chars = self.text[start:end] | |
return JsonRepairError(f'Invalid unicode character "{chars}"', self.i) | |
def got(self) -> str: | |
return f"but got '{self.char()}'" if not self.is_end_of_document() else "but reached end of input" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The ISC License | |
Copyright (c) 2020-2022 by Jos de Jong, Andreas Donig | |
Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted, provided that the above copyright notice and this permission notice appear in all copies. | |
THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test_jsonrepair.py - Testsuite for jsonrepair.py | |
# | |
# Just https://github.com/josdejong/jsonrepair/blob/develop/src/jsonrepair.test.ts ported to Python. | |
# | |
# This port won't get updates, because the goal should be to generate this library instead. | |
# | |
# See: https://github.com/josdejong/jsonrepair/issues/84 | |
# | |
from typing import Optional | |
import pytest | |
from .jsonrepair import JsonRepair, JsonRepairError | |
def checkvalid(text: str, expected: Optional[str] = None) -> None: | |
__tracebackhide__ = True | |
expected = text if expected is None else expected | |
actual = JsonRepair(text).repair() | |
if actual != expected: | |
pytest.fail(f"repair failed: {text}") | |
@pytest.mark.valid_json | |
def test_full_json_object(): | |
text = '{"a":2.3e100,"b":"str","c":null,"d":false,"e":[1,2,3]}' | |
parsed = JsonRepair(text).repair() | |
assert parsed == text, "should parse a JSON object correctly" | |
@pytest.mark.valid_json | |
def test_whitespace(): | |
checkvalid(" { \n } \t ") | |
@pytest.mark.valid_json | |
def test_object(): | |
checkvalid("{}") | |
checkvalid('{"a": {}}') | |
checkvalid('{"a": "b"}') | |
checkvalid('{"a": 2}') | |
@pytest.mark.valid_json | |
def test_array(): | |
checkvalid("[]") | |
checkvalid("[{}]") | |
checkvalid('{"a":[]}') | |
checkvalid('[1, "hi", true, false, null, {}, []]') | |
@pytest.mark.valid_json | |
def test_number(): | |
checkvalid("23") | |
checkvalid("0") | |
checkvalid("0e+2") | |
checkvalid("0.0") | |
checkvalid("-0") | |
checkvalid("2.3") | |
checkvalid("2300e3") | |
checkvalid("2300e+3") | |
checkvalid("2300e-3") | |
checkvalid("-2") | |
checkvalid("2e-3") | |
checkvalid("2.3e-3") | |
@pytest.mark.valid_json | |
def test_string(): | |
checkvalid('"str"') | |
checkvalid('"\\"\\\\\\/\\b\\f\\n\\r\\t"') | |
checkvalid('"\\u260E"') | |
@pytest.mark.valid_json | |
def test_keywords(): | |
checkvalid("true") | |
checkvalid("false") | |
checkvalid("null") | |
@pytest.mark.valid_json | |
def test_strings_equaling_json_delimiter(): | |
checkvalid('""') | |
checkvalid('"["') | |
checkvalid('"]"') | |
checkvalid('"{"') | |
checkvalid('"}"') | |
checkvalid('":"') | |
checkvalid('","') | |
@pytest.mark.valid_json | |
def test_unicode_characters_in_string(): | |
checkvalid('"★"') | |
checkvalid('"\u2605"') | |
checkvalid('"😀"') | |
checkvalid('"\ud83d\ude00"') | |
@pytest.mark.valid_json | |
def test_unicode_characters_in_key(): | |
checkvalid('{"★":true}') | |
checkvalid('{"\u2605":true}') | |
checkvalid('{"😀":true}') | |
checkvalid('{"\ud83d\ude00":true}') | |
@pytest.mark.invalid_json | |
def test_add_missing_quotes(): | |
checkvalid("abc", '"abc"') | |
checkvalid("hello world", '"hello world"') | |
checkvalid("{a:2}", '{"a":2}') | |
checkvalid("{a: 2}", '{"a": 2}') | |
checkvalid("{2: 2}", '{"2": 2}') | |
checkvalid("{true: 2}", '{"true": 2}') | |
checkvalid("{\n a: 2\n}", '{\n "a": 2\n}') | |
checkvalid("[a,b]", '["a","b"]') | |
checkvalid("[\na,\nb\n]", '[\n"a",\n"b"\n]') | |
@pytest.mark.invalid_json | |
def test_add_missing_end_quote(): | |
checkvalid('"abc', '"abc"') | |
checkvalid("'abc", '"abc"') | |
checkvalid("\u2018abc", '"abc"') | |
@pytest.mark.invalid_json | |
def test_single_quotes_become_double_quotes(): | |
checkvalid("{'a':2}", '{"a":2}') | |
checkvalid("{'a':'foo'}", '{"a":"foo"}') | |
checkvalid("{\"a\":'foo'}", '{"a":"foo"}') | |
checkvalid("{a:'foo',b:'bar'}", '{"a":"foo","b":"bar"}') | |
@pytest.mark.invalid_json | |
def test_special_quotes_become_double_quotes(): | |
checkvalid("{“a”:“b”}", '{"a":"b"}') | |
checkvalid("{‘a’:‘b’}", '{"a":"b"}') | |
checkvalid("{`a´:`b´}", '{"a":"b"}') | |
@pytest.mark.invalid_json | |
def test_string_content_untouched(): | |
checkvalid('"{a:b}"', '"{a:b}"') | |
@pytest.mark.invalid_json | |
def test_add_remove_escape_characters(): | |
checkvalid('"foo\'bar"', '"foo\'bar"') | |
checkvalid('"foo\\"bar"', '"foo\\"bar"') | |
checkvalid("'foo\"bar'", '"foo\\"bar"') | |
checkvalid("'foo\\'bar'", '"foo\'bar"') | |
checkvalid('"foo\\\'bar"', '"foo\'bar"') | |
checkvalid('"\\a"', '"a"') | |
@pytest.mark.invalid_json | |
def test_escape_unescaped_control_characters(): | |
checkvalid('"hello\bworld"', '"hello\\bworld"') | |
checkvalid('"hello\fworld"', '"hello\\fworld"') | |
checkvalid('"hello\nworld"', '"hello\\nworld"') | |
checkvalid('"hello\rworld"', '"hello\\rworld"') | |
checkvalid('"hello\tworld"', '"hello\\tworld"') | |
checkvalid('{"value\n": "dc=hcm,dc=com"}', '{"value\\n": "dc=hcm,dc=com"}') | |
@pytest.mark.invalid_json | |
def test_replace_special_white_space_characters(): | |
checkvalid('{"a":\u00a0"foo\u00a0bar"}', '{"a": "foo\u00a0bar"}') | |
checkvalid('{"a":\u202F"foo"}', '{"a": "foo"}') | |
checkvalid('{"a":\u205F"foo"}', '{"a": "foo"}') | |
checkvalid('{"a":\u3000"foo"}', '{"a": "foo"}') | |
@pytest.mark.invalid_json | |
def test_replace_non_normalized_left_right_quotes(): | |
checkvalid("\u2018foo\u2019", '"foo"') | |
checkvalid("\u201Cfoo\u201D", '"foo"') | |
checkvalid("\u0060foo\u00B4", '"foo"') | |
# mix single quotes | |
checkvalid("\u0060foo'", '"foo"') | |
checkvalid("\u0060foo'", '"foo"') | |
@pytest.mark.invalid_json | |
def test_remove_block_comments(): | |
checkvalid("/* foo */ {}", " {}") | |
checkvalid("{} /* foo */ ", "{} ") | |
checkvalid("{} /* foo ", "{} ") | |
checkvalid("\n/* foo */\n{}", "\n\n{}") | |
checkvalid('{"a":"foo",/*hello*/"b":"bar"}', '{"a":"foo","b":"bar"}') | |
@pytest.mark.invalid_json | |
def test_remove_line_comments(): | |
checkvalid("{} // comment", "{} ") | |
checkvalid('{\n"a":"foo",//hello\n"b":"bar"\n}', '{\n"a":"foo",\n"b":"bar"\n}') | |
@pytest.mark.invalid_json | |
def test_keep_comments_inside_string(): | |
checkvalid('"/* foo */"', '"/* foo */"') | |
@pytest.mark.invalid_json | |
def test_strip_jsonp_notation(): | |
# matching | |
checkvalid("callback_123({});", "{}") | |
checkvalid("callback_123([]);", "[]") | |
checkvalid("callback_123(2);", "2") | |
checkvalid('callback_123("foo");', '"foo"') | |
checkvalid("callback_123(null);", "null") | |
checkvalid("callback_123(true);", "true") | |
checkvalid("callback_123(false);", "false") | |
checkvalid("callback({}", "{}") | |
checkvalid("/* foo bar */ callback_123 ({})", " {}") | |
checkvalid("/* foo bar */ callback_123 ({})", " {}") | |
checkvalid("/* foo bar */\ncallback_123({})", "\n{}") | |
checkvalid("/* foo bar */ callback_123 ( {} )", " {} ") | |
checkvalid(" /* foo bar */ callback_123({}); ", " {} ") | |
checkvalid("\n/* foo\nbar */\ncallback_123 ({});\n\n", "\n\n{}\n\n") | |
# non-matching | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("callback {}").repair() | |
assert str(err.value) == 'Unexpected character "{" at position 9' | |
@pytest.mark.invalid_json | |
def test_repair_escaped_string_contents(): | |
checkvalid('\\"hello world\\"', '"hello world"') | |
checkvalid('\\"hello world\\', '"hello world"') | |
checkvalid('\\"hello \\\\"world\\\\"\\"', '"hello \\"world\\""') | |
checkvalid('[\\"hello \\\\"world\\\\"\\"]', '["hello \\"world\\""]') | |
checkvalid('{\\"stringified\\": \\"hello \\\\"world\\\\"\\"}', '{"stringified": "hello \\"world\\""}') | |
# the following is weird but understandable | |
checkvalid('[\\"hello\\, \\"world\\"]', '["hello, ","world\\\\","]"]') | |
# the following is sort of invalid: the end quote should be escaped too, | |
# but the fixed result is most likely what you want in the end | |
checkvalid('\\"hello"', '"hello"') | |
@pytest.mark.invalid_json | |
def test_strip_trailing_commas_from_array(): | |
checkvalid("[1,2,3,]", "[1,2,3]") | |
checkvalid("[1,2,3,\n]", "[1,2,3\n]") | |
checkvalid("[1,2,3, \n ]", "[1,2,3 \n ]") | |
checkvalid("[1,2,3,/*foo*/]", "[1,2,3]") | |
checkvalid('{"array":[1,2,3,]}', '{"array":[1,2,3]}') | |
# not matching: inside a string | |
checkvalid('"[1,2,3,]"', '"[1,2,3,]"') | |
@pytest.mark.invalid_json | |
def test_strip_trailing_commas_from_object(): | |
checkvalid('{"a":2,}', '{"a":2}') | |
checkvalid('{"a":2 , }', '{"a":2 }') | |
checkvalid('{"a":2 , \n }', '{"a":2 \n }') | |
checkvalid('{"a":2/*foo*/,/*foo*/}', '{"a":2}') | |
# not matching: inside a string | |
checkvalid('"{a:2,}"', '"{a:2,}"') | |
@pytest.mark.invalid_json | |
def test_strip_trailing_comma(): | |
checkvalid("4,", "4") | |
checkvalid("4 ,", "4 ") | |
checkvalid("4 , ", "4 ") | |
checkvalid('{"a":2},', '{"a":2}') | |
checkvalid("[1,2,3],", "[1,2,3]") | |
@pytest.mark.invalid_json | |
def test_add_missing_closing_bracket_for_object(): | |
checkvalid("{", "{}") | |
checkvalid('{"a":2', '{"a":2}') | |
checkvalid('{"a":2,', '{"a":2}') | |
checkvalid('{"a":{"b":2}', '{"a":{"b":2}}') | |
checkvalid('{\n "a":{"b":2\n}', '{\n "a":{"b":2\n}}') | |
checkvalid('[{"b":2]', '[{"b":2}]') | |
checkvalid('[{"b":2\n]', '[{"b":2}\n]') | |
checkvalid('[{"i":1{"i":2}]', '[{"i":1},{"i":2}]') | |
checkvalid('[{"i":1,{"i":2}]', '[{"i":1},{"i":2}]') | |
@pytest.mark.invalid_json | |
def test_add_missing_closing_bracket_for_array(): | |
checkvalid("[", "[]") | |
checkvalid("[1,2,3", "[1,2,3]") | |
checkvalid("[1,2,3,", "[1,2,3]") | |
checkvalid("[[1,2,3,", "[[1,2,3]]") | |
checkvalid('{\n"values":[1,2,3\n}', '{\n"values":[1,2,3]\n}') | |
checkvalid('{\n"values":[1,2,3\n', '{\n"values":[1,2,3]}\n') | |
@pytest.mark.invalid_json | |
def test_strip_mongodb_data_types(): | |
# simple | |
checkvalid('NumberLong("2")', '"2"') | |
checkvalid('{"_id":ObjectId("123")}', '{"_id":"123"}') | |
# extensive | |
mongo_document = ( | |
"{\n" | |
' "_id" : ObjectId("123"),\n' | |
' "isoDate" : ISODate("2012-12-19T06:01:17.171Z"),\n' | |
' "regularNumber" : 67,\n' | |
' "long" : NumberLong("2"),\n' | |
' "long2" : NumberLong(2),\n' | |
' "int" : NumberInt("3"),\n' | |
' "int2" : NumberInt(3),\n' | |
' "decimal" : NumberDecimal("4"),\n' | |
' "decimal2" : NumberDecimal(4)\n' | |
"}" | |
) | |
expected_json = ( | |
"{\n" | |
' "_id" : "123",\n' | |
' "isoDate" : "2012-12-19T06:01:17.171Z",\n' | |
' "regularNumber" : 67,\n' | |
' "long" : "2",\n' | |
' "long2" : 2,\n' | |
' "int" : "3",\n' | |
' "int2" : 3,\n' | |
' "decimal" : "4",\n' | |
' "decimal2" : 4\n' | |
"}" | |
) | |
checkvalid(mongo_document, expected_json) | |
@pytest.mark.invalid_json | |
def test_replace_python_constants(): | |
checkvalid("True", "true") | |
checkvalid("False", "false") | |
checkvalid("None", "null") | |
@pytest.mark.invalid_json | |
def test_turn_unknown_symbols_into_string(): | |
checkvalid("foo", '"foo"') | |
checkvalid("[1,foo,4]", '[1,"foo",4]') | |
checkvalid("{foo: bar}", '{"foo": "bar"}') | |
checkvalid("foo 2 bar", '"foo 2 bar"') | |
checkvalid("{greeting: hello world}", '{"greeting": "hello world"}') | |
checkvalid('{greeting: hello world\nnext: "line"}', '{"greeting": "hello world",\n"next": "line"}') | |
checkvalid("{greeting: hello world!}", '{"greeting": "hello world!"}') | |
@pytest.mark.invalid_json | |
def test_concatenate_strings(): | |
checkvalid('"hello" + " world"', '"hello world"') | |
checkvalid('"hello" +\n " world"', '"hello world"') | |
checkvalid('"a"+"b"+"c"', '"abc"') | |
checkvalid('"hello" + /*comment*/ " world"', '"hello world"') | |
checkvalid("{\n \"greeting\": 'hello' +\n 'world'\n}", '{\n "greeting": "helloworld"\n}') | |
@pytest.mark.invalid_json | |
def test_repair_missing_comma_between_array_items(): | |
checkvalid('{"array": [{}{}]}', '{"array": [{},{}]}') | |
checkvalid('{"array": [{} {}]}', '{"array": [{}, {}]}') | |
checkvalid('{"array": [{}\n{}]}', '{"array": [{},\n{}]}') | |
checkvalid('{"array": [\n{}\n{}\n]}', '{"array": [\n{},\n{}\n]}') | |
checkvalid('{"array": [\n1\n2\n]}', '{"array": [\n1,\n2\n]}') | |
checkvalid('{"array": [\n"a"\n"b"\n]}', '{"array": [\n"a",\n"b"\n]}') | |
# should leave normal array as is | |
checkvalid("[\n{},\n{}\n]") | |
@pytest.mark.invalid_json | |
def test_repair_missing_comma_between_object_properties(): | |
# checkvalid('{"a":2\n"b":3\n}', '{"a":2,\n"b":3\n}') | |
checkvalid('{"a":2\n"b":3\nc:4}', '{"a":2,\n"b":3,\n"c":4}') | |
@pytest.mark.invalid_json | |
def test_repair_missing_colon_between_object_key_and_value(): | |
checkvalid('{"a" "b"}', '{"a": "b"}') | |
checkvalid('{"a" 2}', '{"a": 2}') | |
checkvalid('{\n"a" "b"\n}', '{\n"a": "b"\n}') | |
checkvalid("{\"a\" 'b'}", '{"a": "b"}') | |
checkvalid("{'a' 'b'}", '{"a": "b"}') | |
checkvalid("{“a” “b”}", '{"a": "b"}') | |
checkvalid("{a 'b'}", '{"a": "b"}') | |
checkvalid("{a “b”}", '{"a": "b"}') | |
@pytest.mark.invalid_json | |
def test_repair_missing_combination_of_comma_quotes_and_brackets(): | |
checkvalid('{"array": [\na\nb\n]}', '{"array": [\n"a",\n"b"\n]}') | |
checkvalid("1\n2", "[\n1,\n2\n]") | |
checkvalid("[a,b\nc]", '["a","b",\n"c"]') | |
@pytest.mark.invalid_json | |
def test_repair_newline_separated_json(): # for example from MongoDB | |
text = "" + "/* 1 */\n" + "{}\n" + "\n" + "/* 2 */\n" + "{}\n" + "\n" + "/* 3 */\n" + "{}\n" | |
expected = "[\n\n{},\n\n\n{},\n\n\n{}\n\n]" | |
checkvalid(text, expected) | |
@pytest.mark.invalid_json | |
def test_repair_newline_separated_json_having_commas(): | |
text = "" + "/* 1 */\n" + "{},\n" + "\n" + "/* 2 */\n" + "{},\n" + "\n" + "/* 3 */\n" + "{}\n" | |
expected = "[\n\n{},\n\n\n{},\n\n\n{}\n\n]" | |
checkvalid(text, expected) | |
@pytest.mark.invalid_json | |
def test_repair_newline_separated_json_having_commas_and_trailing_comma(): | |
text = "" + "/* 1 */\n" + "{},\n" + "\n" + "/* 2 */\n" + "{},\n" + "\n" + "/* 3 */\n" + "{},\n" | |
expected = "[\n\n{},\n\n\n{},\n\n\n{}\n\n]" | |
checkvalid(text, expected) | |
@pytest.mark.invalid_json | |
def test_repair_comma_separated_list_with_value(): | |
checkvalid("1,2,3", "[\n1,2,3\n]") | |
checkvalid("1,2,3,", "[\n1,2,3\n]") | |
checkvalid("1\n2\n3", "[\n1,\n2,\n3\n]") | |
checkvalid("a\nb", '[\n"a",\n"b"\n]') | |
checkvalid("a,b", '[\n"a","b"\n]') | |
@pytest.mark.invalid_json | |
def test_throw_exception_in_case_of_non_repairable_issues(): | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("").repair() | |
assert str(err.value) == "Unexpected end of json string at position 0" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair('{"a",').repair() | |
assert str(err.value) == "Colon expected at position 4" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("{:2}").repair() | |
assert str(err.value) == "Object key expected at position 1" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair('{"a":2,]').repair() | |
assert str(err.value) == 'Unexpected character "]" at position 7' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair('{"a" ]').repair() | |
assert str(err.value) == "Colon expected at position 5" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("{}}").repair() | |
assert str(err.value) == 'Unexpected character "}" at position 2' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("[2,}").repair() | |
assert str(err.value) == 'Unexpected character "}" at position 3' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("2.3.4").repair() | |
assert str(err.value) == 'Unexpected character "." at position 3' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("2..3").repair() | |
assert str(err.value) == "Invalid number '2.', expecting a digit but got '.' at position 2" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("2e3.4").repair() | |
assert str(err.value) == 'Unexpected character "." at position 3' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("2e").repair() | |
assert str(err.value) == "Invalid number '2e', expecting a digit but reached end of input at position 2" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("-").repair() | |
assert str(err.value) == "Invalid number '-', expecting a digit but reached end of input at position 2" | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair("foo [").repair() | |
assert str(err.value) == 'Unexpected character "[" at position 4' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair('"\\u26"').repair() | |
assert str(err.value) == 'Invalid unicode character "\\u26" at position 1' | |
with pytest.raises(JsonRepairError) as err: | |
JsonRepair('"\\uZ000"').repair() | |
assert str(err.value) == 'Invalid unicode character "\\uZ000" at position 1' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment