Last active
January 11, 2024 10:58
-
-
Save taikedz/e32493c144fd4087e7d1850c264258b9 to your computer and use it in GitHub Desktop.
Extract data from matching entries in a JSON file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from argparse import ArgumentParser | |
import json | |
from typing import List | |
OPERATIONS = { | |
"starts": lambda value, data: data.startswith(value), | |
"!starts": lambda value, data: not data.startswith(value), | |
"ends": lambda value, data: data.endswith(value), | |
"!ends": lambda value, data: not data.endswith(value), | |
"has": lambda value, data: value in data, | |
"!has": lambda value, data: value not in data, | |
"lt": lambda value, data: data < float(value), | |
"lte": lambda value, data: data <= float(value), | |
"gt": lambda value, data: data > float(value), | |
"gte": lambda value, data: data >= float(value), | |
# A bit special - the "is" only works when target value is the string "null" | |
"is": lambda value, data: data is None and asserting(value, "null"), | |
"!is": lambda value, data: data is not None and asserting(value, "null"), | |
} | |
def asserting(value, ok_values): | |
if not isinstance(ok_values, (list,tuple)): | |
ok_values = [ok_values] | |
assert value in ok_values, f"Invalid value: {value} . (Supported values: {ok_values})" | |
return True | |
def parse_args(): | |
parser = ArgumentParser() | |
# Check pasta dishes that don't have "pasta" in the name, but do have as ingredient ....?? | |
# json-select-where.py my-restaurant.json --base .menu.dishes --where="ingredients has pasta;name !has pasta" --extract "name;ingredients" | |
parser.add_argument("files", help="The JSON files to read", nargs="+") | |
parser.add_argument("--base", default=".", help=("The path to the base _array_ to iterate over " | |
"(all path sections are slash-separated ('/') " | |
"e.g. first item of an entry named 'things' would be 'things/0'")) | |
parser.add_argument("--where", help=("<path> <operator> <target value>, semi-colon-separated, like 'ingredients has pasta; name !has pasta' operators are " | |
"('starts', 'ends', 'has', '!starts', '!ends', '!has', " | |
"'gt', 'gte', 'lt', 'lte', '<key> is null', '<key> !is null'). " | |
"<path> is releative to the root of an individual entry. If <path> is not found, no match.")) | |
parser.add_argument("--extract", help=("Field paths to extract from each matching entry (e.g. 'utensils;spices')." | |
" If unspecified, entire entry is retained.")) | |
args = parser.parse_args() | |
return args | |
def getitem(object, path:str, absent_ok=False): | |
try: | |
return _getitem_base(object, path) | |
except Exception as e: | |
if absent_ok: | |
return | |
raise | |
def _getitem_base(object, path:str): | |
seen = [] | |
path = path.split("/") | |
for token in path: | |
if token == ".": continue | |
seen.append(token) | |
if isinstance(object, dict): | |
if token not in object: | |
raise KeyError(f"No data at {seen}") | |
object = object[token] | |
elif isinstance(object, list): | |
try: | |
index = int(token) | |
except ValueError: | |
raise ValueError(f"Reached list at {seen[:-1]}, must dereference with int, not '{token}'") | |
if index >= len(object): | |
raise IndexError(f"Only {len(object)} items, tried to dereference {index}") | |
object = object[index] | |
else: | |
raise ValueError(f"Cannot dereference {type(object)} at {seen}") | |
return object | |
def matches(entry, where:str): | |
checks = [ c.strip().split(maxsplit=2) for c in where.split(";") if c.strip()] | |
for ck in checks: | |
try: | |
target_key, operator, value = ck | |
except ValueError: | |
raise ValueError(f"Must have three components <path> <operator> <target-value> , got: {ck}") | |
if operator not in OPERATIONS: | |
raise NotImplementedError(f"Comparison not implemented: {operator}") | |
target_value = getitem(entry, target_key, absent_ok=True) | |
if not target_value and operator not in ["is", "!is"]: | |
# Tried to compare a key that does not exist - this is akin to "not a match" | |
return False | |
if not OPERATIONS[operator](value, target_value): | |
return False | |
return True | |
def get_entries_for(data, where): | |
retained = [] | |
for entry in data: | |
if matches(entry, where): | |
retained.append(entry) | |
return retained | |
def extract_parts(extract:str, valid_entries:List): | |
targets = [t.strip() for t in extract.split(',')] | |
retained_entries = [] | |
for entry in valid_entries: | |
retained_entries.append({target:getitem(entry, target, absent_ok=True) for target in targets}) | |
return retained_entries | |
def main(): | |
args = parse_args() | |
valid_entries = [] | |
for filepath in args.files: | |
with open(filepath) as fh: | |
data = json.load(fh) | |
base_data = getitem(data, args.base) | |
valid_entries.extend( base_data ) | |
if args.where: | |
valid_entries = get_entries_for(valid_entries, args.where) | |
if args.extract: | |
valid_entries = extract_parts(args.extract, valid_entries) | |
print(json.dumps(valid_entries, indent=2)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment