Skip to content

Instantly share code, notes, and snippets.

@ekimekim
Last active March 13, 2019 23:54
Show Gist options
  • Save ekimekim/940688f2d4c89a0c5c4cd06bb3509a90 to your computer and use it in GitHub Desktop.
Save ekimekim/940688f2d4c89a0c5c4cd06bb3509a90 to your computer and use it in GitHub Desktop.
A very hacky way of creating a python object which lazily looks up keys of a jsonnet object as needed, without re-evaluating the entire jsonnet each time.
import collections
import json
import threading
from Queue import Queue
import _jsonnet as jsonnet
class LazyExecutor(threading.Thread):
"""Runs a jsonnet snippet that produces an object.
You can subsequently request keys using get_key().
This will only evaluate and return the requested key, so you can effectively "lazily evaluate"
your jsonnet as you need each key. The jsonnet execution context is maintained throughout.
When you are done, you should stop() the thread to release resources.
"""
daemon = True
# Sentinel value that goes in queue to indicate a graceful shutdown
STOP = object()
def __init__(self, code):
super(LazyExecutor, self).__init__()
self.code = code
self.lock = threading.Lock() # governs access to queue, pending and values
# Ordered queue of keys to be requested. Requesters also check pending to ensure
# there are no repeats.
self.queue = Queue()
# Map from keys to get, to the Event object that threads are waiting on for that key
self.pending = {}
# We shouldn't repeat return values from callback, so we cache the value for each
# key we've already retrieved
self.values = {}
# Keeps track of what key we returned for each input, so that we don't break our
# pure-function guarentee if we're called twice.
self.return_values = {}
# Indicates if we initiated shut down cleanly
self.finished = False
# Indicates if we've stopped and can no longer service requests
self.stopped = False
# If we did not shut down cleanly, contains the error message
self.error = None
def run(self):
try:
jsonnet.evaluate_snippet(
filename="<inline>",
src="""
function(code)
local f(key) = f(
std.native("callback")(
key,
if key == null then null
else if key == true then std.manifestJson(std.objectFields(code))
else std.manifestJson(code[key]),
)
) tailstrict;
f(null)
""",
tla_codes={"code": self.code},
native_callbacks={"callback": (("key", "value"), self._callback)},
)
except Exception as e:
with self.lock:
# If this was a graceful stop, no further action needed
if self.finished:
return
# Otherwise, trigger all remaining waiters and record the error
self.stopped = True
self.error = e
for event in self.pending.values():
event.set()
def _callback(self, key, value):
# If this is a repeat call, return the same result and do nothing else
if key in self.return_values:
return self.return_values[key]
# Save passed back value (ignoring null case)
if key is not None:
value = json.loads(value)
with self.lock:
# save the value
assert key not in self.values
self.values[key] = value
# inform waiters
assert key in self.pending
event = self.pending.pop(key)
event.set()
# Return what key to ask for next, or stop if we're done
next_key = self.queue.get()
if next_key is self.STOP:
self.finished = True
raise Exception("done")
self.return_values[key] = next_key
return next_key
def get_key(self, key):
"""Can be called from another thread to retrieve a specific key"""
with self.lock:
if self.stopped:
raise ValueError("Cannot get key - executor is stopped")
if key in self.values:
return self.values[key]
if key not in self.pending:
self.pending[key] = threading.Event()
self.queue.put(key)
event = self.pending[key]
event.wait()
if key not in self.values:
# The only case where this can happen is on ungraceful shutdown
raise ValueError("Failed to get key: {}".format(self.error))
return self.values[key]
def get_keys(self):
"""As per get_key(), but returns the list of available keys"""
# True is used as a sentinel value that can pass the jsonnet-python barrier safely
return self.get_key(True)
def stop(self):
with self.lock:
if self.stopped:
return
self.stopped = True
self.queue.put(self.STOP)
class LazyProxy(collections.Mapping):
"""Wraps some jsonnet code that returns an object.
Acts like a dict, but only evaluates each key of the object the first time it's requested.
Call close() to release resources.
"""
def __init__(self, code):
self.executor = LazyExecutor(code)
self.executor.start()
def __iter__(self):
return iter(self.executor.get_keys())
def __len__(self):
return len(self.executor.get_keys())
def __getitem__(self, key):
return self.executor.get_key(key)
def __del__(self):
self.close()
def close(self):
self.executor.stop()
def test(code, *keys):
proxy = LazyProxy(code)
print proxy.keys()
for key in keys:
print proxy[key]
if __name__ == '__main__':
import argh
argh.dispatch_command(test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment