Skip to content

Instantly share code, notes, and snippets.

@taikedz
Last active January 11, 2024 10:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taikedz/e32493c144fd4087e7d1850c264258b9 to your computer and use it in GitHub Desktop.
Save taikedz/e32493c144fd4087e7d1850c264258b9 to your computer and use it in GitHub Desktop.
Extract data from matching entries in a JSON file
from argparse import ArgumentParser
import json
from typing import List
OPERATIONS = {
"starts": lambda value, data: data.startswith(value),
"!starts": lambda value, data: not data.startswith(value),
"ends": lambda value, data: data.endswith(value),
"!ends": lambda value, data: not data.endswith(value),
"has": lambda value, data: value in data,
"!has": lambda value, data: value not in data,
"lt": lambda value, data: data < float(value),
"lte": lambda value, data: data <= float(value),
"gt": lambda value, data: data > float(value),
"gte": lambda value, data: data >= float(value),
# A bit special - the "is" only works when target value is the string "null"
"is": lambda value, data: data is None and asserting(value, "null"),
"!is": lambda value, data: data is not None and asserting(value, "null"),
}
def asserting(value, ok_values):
if not isinstance(ok_values, (list,tuple)):
ok_values = [ok_values]
assert value in ok_values, f"Invalid value: {value} . (Supported values: {ok_values})"
return True
def parse_args():
parser = ArgumentParser()
# Check pasta dishes that don't have "pasta" in the name, but do have as ingredient ....??
# json-select-where.py my-restaurant.json --base .menu.dishes --where="ingredients has pasta;name !has pasta" --extract "name;ingredients"
parser.add_argument("files", help="The JSON files to read", nargs="+")
parser.add_argument("--base", default=".", help=("The path to the base _array_ to iterate over "
"(all path sections are slash-separated ('/') "
"e.g. first item of an entry named 'things' would be 'things/0'"))
parser.add_argument("--where", help=("<path> <operator> <target value>, semi-colon-separated, like 'ingredients has pasta; name !has pasta' operators are "
"('starts', 'ends', 'has', '!starts', '!ends', '!has', "
"'gt', 'gte', 'lt', 'lte', '<key> is null', '<key> !is null'). "
"<path> is releative to the root of an individual entry. If <path> is not found, no match."))
parser.add_argument("--extract", help=("Field paths to extract from each matching entry (e.g. 'utensils;spices')."
" If unspecified, entire entry is retained."))
args = parser.parse_args()
return args
def getitem(object, path:str, absent_ok=False):
try:
return _getitem_base(object, path)
except Exception as e:
if absent_ok:
return
raise
def _getitem_base(object, path:str):
seen = []
path = path.split("/")
for token in path:
if token == ".": continue
seen.append(token)
if isinstance(object, dict):
if token not in object:
raise KeyError(f"No data at {seen}")
object = object[token]
elif isinstance(object, list):
try:
index = int(token)
except ValueError:
raise ValueError(f"Reached list at {seen[:-1]}, must dereference with int, not '{token}'")
if index >= len(object):
raise IndexError(f"Only {len(object)} items, tried to dereference {index}")
object = object[index]
else:
raise ValueError(f"Cannot dereference {type(object)} at {seen}")
return object
def matches(entry, where:str):
checks = [ c.strip().split(maxsplit=2) for c in where.split(";") if c.strip()]
for ck in checks:
try:
target_key, operator, value = ck
except ValueError:
raise ValueError(f"Must have three components <path> <operator> <target-value> , got: {ck}")
if operator not in OPERATIONS:
raise NotImplementedError(f"Comparison not implemented: {operator}")
target_value = getitem(entry, target_key, absent_ok=True)
if not target_value and operator not in ["is", "!is"]:
# Tried to compare a key that does not exist - this is akin to "not a match"
return False
if not OPERATIONS[operator](value, target_value):
return False
return True
def get_entries_for(data, where):
retained = []
for entry in data:
if matches(entry, where):
retained.append(entry)
return retained
def extract_parts(extract:str, valid_entries:List):
targets = [t.strip() for t in extract.split(',')]
retained_entries = []
for entry in valid_entries:
retained_entries.append({target:getitem(entry, target, absent_ok=True) for target in targets})
return retained_entries
def main():
args = parse_args()
valid_entries = []
for filepath in args.files:
with open(filepath) as fh:
data = json.load(fh)
base_data = getitem(data, args.base)
valid_entries.extend( base_data )
if args.where:
valid_entries = get_entries_for(valid_entries, args.where)
if args.extract:
valid_entries = extract_parts(args.extract, valid_entries)
print(json.dumps(valid_entries, indent=2))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment