-
-
Save JimDennis/9d800d9eea1c5fa38c3b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
from __future__ import print_function | |
import json, sys | |
usage = \ | |
'''Extract data from JSON structures using compact "paths" of keys/indexes | |
%s file [path [path ...]] | |
Given a JSON file and a list of paths (. seperate list of keys) return the | |
item in the data structure extracted by traversing the path. | |
Given just a JSON filename, print a list of all paths found. | |
''' | |
class JSON_Walker(object): | |
'''Provide an object which can be called to traverse a JSON data | |
structure and build a list of paths (sequences of dictionary | |
keys/list indexes) to each leaf node of data in the structure | |
>>> example = JSON_Walker(); example({"top": [{"frob": "baz", | |
... "foo": "bar"}, {"nums": [1, 2, 3]}]}) | |
['.top.0.frob', '.top.0.foo', '.top.1.nums.0', '.top.1.nums.1', '.top.1.nums.2'] | |
''' | |
def __init__(self): | |
self.nodes = list() | |
self.data = None | |
def __call__(self, data, path=''): | |
'''Recursively traverses a JSON data structure adding the | |
"path" to each leaf node to self.nodes | |
''' | |
if self.data is None: | |
self.data = data | |
if isinstance(data, type([])): | |
for key in range(len(data)): | |
self(data[key], '%s.%s' % (path, key)) | |
elif hasattr(data, 'keys') and callable(data.keys): | |
for key in data.keys(): | |
self(data[key], '%s.%s' % (path, key)) | |
else: | |
self.nodes.append(path[1:]) | |
return self.nodes | |
def __getitem__(self, item): | |
'''Given a path return the data node at that path | |
>>> example = JSON_Walker(); this = example({"top": [{"frob": "baz", | |
... "foo": "bar"}, {"nums": [1, 2, 3]}]}); example['.top.1.nums'] | |
[1, 2, 3] | |
''' | |
if self.data is None: | |
raise KeyError, "No data" | |
if isinstance(item, type(int())): | |
return self.nodes[item] | |
if item.startswith('.'): | |
item = item[1:] # trim off leading dot | |
path = str(item).split('.') | |
t = self.data.copy() | |
for i in path: | |
if t is None: | |
break | |
elif isinstance(t, type([])): | |
try: | |
i = int(i) | |
except ValueError, e: | |
break | |
try: | |
t = t[i] | |
except (IndexError, ValueError), e: | |
break | |
elif hasattr(t, 'get'): | |
t1 = t.get(i, None) | |
t = t1 | |
if t is None: | |
raise KeyError, "Key not found: %s" % path | |
return t | |
def __repr__(self): | |
return self.nodes | |
## Following is alternative implementation of the __call__() function | |
## in the previous class: | |
def get_json_keys(data, path=''): | |
'''Given JSON data, yield a sequence of the paths leading to each item | |
Recursively traverses a JSON data structure yielding each "path" | |
(keys or indices) which could be used to extract data in the leaves. | |
>>> get_json_keys({"top": [{"frob": "baz", "foo": "bar"}, | |
... {"nums": [1, 2, 3]}]}); results | |
['.top.0.frob', '.top.0.foo', '.top.1.nums.0', '.top.1.nums.1', '.top.1.nums.2'] | |
''' | |
global results | |
if isinstance(data, type([])): | |
for key in range(len(data)): | |
get_json_keys(data[key], '%s.%s' % (path, key)) | |
elif hasattr(data, 'keys') and callable(data.keys): | |
for key in data.keys(): | |
get_json_keys(data[key], '%s.%s' % (path, key)) | |
else: | |
results.append(path) | |
## print path | |
def self_test(**opts): | |
import doctest | |
return doctest.testmod(**opts) | |
if __name__ == '__main__': | |
## Alternative impelementation requires a global variable! :( | |
results = list() | |
args = sys.argv[1:] | |
if len(args): | |
if args[0] in ('-t', '--test'): | |
results = self_test(verbose=True) | |
sys.exit(results[0]) | |
else: | |
fn = args[0] | |
try: | |
f = open(fn, 'r') | |
except EnvironmentError, e: | |
print('Unable to read %s: %s' % (fn, e), file=sys.stderr) | |
sys.exit(1) | |
try: | |
data = json.load(f) | |
except ValueError, e: | |
print('Unable to parse data from %s: %s' % (fn, e), file=sys.stderr) | |
sys.exit(2) | |
walk = JSON_Walker() | |
walk(data) | |
else: | |
print(usage % sys.argv[0], file=sys.stderr) | |
sys.exit(0) | |
if len(args) == 1: | |
print('\n'.join(walk)) | |
else: # It's more than 1 so: | |
results = list() | |
for each in args[1:]: | |
try: | |
results.append(str(walk[each])) | |
except KeyError, e: | |
print('NotFound(%s)' % each, file=sys.stderr) | |
print(' '.join(results)) | |
## Stuff below doesn't work ... was for trying various other | |
## alternative implementations: | |
## for i in get_json_keys(data): | |
## print i | |
## print '\n'.join(get_json_keys(data, list())) | |
## get_json_keys(data) | |
## walk = JSON_Walker() | |
## print '\n'.join(walk(data)) | |
## print walk.results | |
## get_json_keys(data) | |
## print '\n'.join(results) |
Running this against the output of every aws ec2 describe-* command available to me I generated 32 files containing over 580,000 entries. The largest JSON output was from the aws ec2 describe-images command which generates 416,000 lines describing 18,922 images (and takes my xj.py code about 2 or three seconds to process).
Boiling this down (replacing all .xxxx. numeric index components with .*. and piping through sort -u leaves me with 285 unique key-paths.
I'm thinking of adding two features to xj.py: one to allow one to extract keys matching a glob pattern, another to match on glob patterns in values while extracting some other data. So, for example, when you want to find the PublicIpAddress for an instance using its InstanceId you might use:
xj myinstances.json '*.InstanceId=i-1234abcd:*.PublicIpAddress'
Instead of something like:
aws ec2 describe-instances --instance-id i-1234abcd > this.json
xj this.json Instances.0.NetworkInterfaces.0.PublicIpAddresses.0.PublicIpAddress
I'll also add a feature that handles - as a filename, slurping data in from stdin and using json.loads() instead of json.load() on a file.
xj.py is a simple JSON parser and extractor. Mostly I've been using it as a utility for pulling data out of the JSON output from AWSCLI commands.
For example I might use a command like this:
... to extract the InstanceID and VPC Private IP address from the *.json file generated by the command:
aws ec2 describe-instances --instance-id i-10aabb00 | tee ./anode.json
If called without a "path" (list of keys) then this spits out a list of all the paths in the JSON, through all dictionaries and lists to any strings or numbers contained in the leaf nodes of the object tree.
The code is somewhat ugly; the only two approaches I've managed to figure out are:
I'm sure there must be a better way to return a list from a recursively called function; but I haven't been able to figure it out. I'm equally certain that there's some way to perform lazy evaluation ... perhaps with memoization ... to implement this as a generator or by implementing the iterator protocol in my class.
About the only good thing I can say about this code is that it does work and it seems to parse all of the JSON emitted by the AWS CLI (Python) tools. Obviously it would fail for any JSON containing keys with "." characters in them. However, that doesn't seem to be common; I haven't seen any cases of such keys in the AWS APIs.