Skip to content

Instantly share code, notes, and snippets.

@noxasaxon
Created February 27, 2021 22:40
Show Gist options
  • Save noxasaxon/ca78d8dc60e2057d1551eaf21145539a to your computer and use it in GitHub Desktop.
Save noxasaxon/ca78d8dc60e2057d1551eaf21145539a to your computer and use it in GitHub Desktop.
testing parameter resolver auto run jinja
import timeit
# root_object = {
# "artifacts": {
# "event": {
# "details": {
# "firstname": "Sterling",
# "middlename": "Malory",
# "lastname": "Archer",
# "vault_test" : "vault:socless_vault_tests.txt"
# }
# }
# }
# }
# vault_pattern = re.compile(r"(vault:)(\S+(?=\s|$))(.*)")
import_module = """
import re
VAULT_TOKEN = "vault:"
PATH_TOKEN = "$."
CONVERSION_TOKEN = "!"
from typing import Any
import json
from jinja2.nativetypes import NativeEnvironment
from jinja2 import select_autoescape, StrictUndefined
def fetch_from_vault(string_thing, content_only=True):
return string_thing
# Jinja Environment Configuration
#! this fails to escape <script>, escaping works with Environment
jinja_env = NativeEnvironment(
autoescape=select_autoescape(
["html", "xml"], default_for_string=True, default=True
),
variable_start_string="{", # this defines the start tokens for a jinja template
variable_end_string="}", # this is the end token for a jinja template
undefined=StrictUndefined, # This configures Jinjas behaviour when a template user provides an undefined reference
### StrictUndefined here ensures that if the user references something that
# Doesn't actually exist in the context, an error is raised
# Without StrictUndefined, invalid references fail silently
# More on undefined types here https://jinja.palletsprojects.com/en/2.11.x/api/#undefined-types
)
# Define Custom Filters
def maptostr(target_list):
return [str(each) for each in target_list]
def vault(vault_id: str):
# A custom jinja Function which returns the content of a socless vault
# we expect it to be called as {vault( context.vault_id) } and return the same value
# that current vault:vault-id would return
return fetch_from_vault(vault_id, content_only=True)
def fromjson(string_json: str) -> Any:
# This is a custom jinja Filter which expects stringified json and returns
# the output of calling json.loads on it.
return json.loads(string_json)
# Add Custom Functions
custom_functions = {"vault": vault, "fromjson": fromjson}
# Add Custom Filters
custom_filters = {"maptostr": maptostr, **custom_functions}
# Register Custom Filters
jinja_env.filters.update(custom_filters)
# Register Custom Functions
jinja_env.globals.update(custom_functions)
def render_jinja_from_string(template_string: str, root_object: dict) -> Any:
template_obj = jinja_env.from_string(template_string)
return template_obj.render(context=root_object)
############################################################################
class OLDParameterResolver:
def __init__(self, root_obj):
self.root_obj = root_obj
def resolve_jsonpath(self, path):
_pre, _sep, post = path.partition(PATH_TOKEN)
keys = post.split(".")
obj_copy = self.root_obj.copy()
for key in keys:
try:
value = obj_copy.get(key)
except AttributeError:
raise SoclessException(
f"Unable to resolve key {key}, parent object does not exist. Full path: {path}"
)
if isinstance(value, str) and value.startswith(VAULT_TOKEN):
actual = self.resolve_vault_path(value)
else:
actual = value
obj_copy = actual
return obj_copy
def resolve_vault_path(self, path):
_, __, file_id = path.partition(VAULT_TOKEN)
data = fetch_from_vault(file_id, content_only=True)
return data
def resolve_reference(self, reference_path):
if not isinstance(reference_path, str):
if isinstance(reference_path, dict):
resolved_dict = {}
for key, value in list(reference_path.items()):
resolved_dict[key] = self.resolve_reference(value)
return resolved_dict
elif isinstance(reference_path, list):
resolved_list = []
for item in reference_path:
resolved_list.append(self.resolve_reference(item))
return resolved_list
else:
return reference_path
if reference_path.startswith(PATH_TOKEN):
reference, _, conversion = reference_path.partition(CONVERSION_TOKEN)
resolved = self.resolve_jsonpath(reference)
elif reference_path.startswith(VAULT_TOKEN):
reference, _, conversion = reference_path.partition(CONVERSION_TOKEN)
resolved = self.resolve_vault_path(reference)
else:
return reference_path
if conversion:
resolved = self.apply_conversion_from(resolved, conversion)
return resolved
def resolve_parameters(self, parameters):
actual_params = {}
for parameter, reference in list(parameters.items()):
actual_params[parameter] = self.resolve_reference(reference)
return actual_params
def apply_conversion_from(self, data, conversion):
print(data)
print(conversion)
if conversion == "json":
return json.loads(data)
class SoclessException(Exception):
pass
class ParameterResolver:
def __init__(self, root_obj):
self.root_obj = root_obj
def resolve_reference(self, reference_path):
if isinstance(reference_path, str):
return resolve_string_parameter(reference_path, self.root_obj)
elif isinstance(reference_path, dict):
resolved_dict = {}
for key, value in list(reference_path.items()):
resolved_dict[key] = self.resolve_reference(value)
return resolved_dict
elif isinstance(reference_path, list):
resolved_list = []
for item in reference_path:
resolved_list.append(self.resolve_reference(item))
return resolved_list
else:
return reference_path
def resolve_parameters(self, parameters):
actual_params = {}
for parameter, reference in parameters.items():
actual_params[parameter] = self.resolve_reference(reference)
return actual_params
def convert_deprecated_vault_to_template(vault_reference) -> str:
reference, _, conversion = vault_reference.partition(CONVERSION_TOKEN)
_, _, file_id = reference.partition(VAULT_TOKEN)
template = f"vault('{file_id}')"
if conversion:
template = template + " |fromjson"
return "{" + template + "}"
def convert_legacy_reference_to_template(reference_path: str) -> str:
try:
template = reference_path
if template.startswith(PATH_TOKEN):
_, _, conversion = template.partition(CONVERSION_TOKEN)
template = f"context{template[1:]}"
if conversion:
template = template + " |fromjson"
template = "{" + template + "}"
elif template.startswith(VAULT_TOKEN):
template = convert_deprecated_vault_to_template(template)
return template
except (TypeError, KeyError) as e:
raise SoclessException(
f"Unable to convert reference type {type(reference_path)} to template - {e}"
)
def resolve_string_parameter(parameter: str, root_object: dict) -> Any:
template = convert_legacy_reference_to_template(parameter)
resolved = render_jinja_from_string(template, root_object)
if isinstance(resolved, str):
# if jsonpath renders into something with a vault_token, it needs to run through jinja again
if resolved.startswith(VAULT_TOKEN):
new_template_string = convert_deprecated_vault_to_template(resolved)
return render_jinja_from_string(new_template_string, root_object)
## autoescaping is currently disabled, this line may not be necessary
# resolved = resolved.replace("&#34;", '"').replace("&#39;", "'")
return resolved
###################_BEGIN_SETUP_##################
root_obj = {
"artifacts": {
"event": {
"details": {
"firstname": "Sterling",
"middlename": "Malory",
"lastname": "Archer",
"vault_test" : "vault:socless_vault_tests.txt"
}
}
}
}
parameters = {
"firstname": "$.artifacts.event.details.firstname",
"lastname": "$.artifacts.event.details.lastname",
"middlename": "Malory",
"vault.txt": "vault:socless_vault_tests.txt",
# "vault.json": "vault:socless_vault_tests.json!json",
"acquaintances": [
{
"firstname": "$.artifacts.event.details.middlename",
"lastname": "$.artifacts.event.details.lastname"
}
]
}
"""
testcode = """
resolver = OLDParameterResolver(root_obj)
output = resolver.resolve_parameters(parameters)
"""
print(min(timeit.repeat(stmt=testcode, setup=import_module, repeat=25, number=1)))
## resolver = OLDParameterResolver(root_obj)
# repeat=25, number=1
# 0.000019248000000082754
# -----
# repeat=2, number=10000
# 0.12176976900000003
# -----
# repeat=5, number=100
# .0014764200000000338
# -----
# repeat=5, number=1000
# 0.012429356000000003
# ___________________
## resolver = ParameterResolver(root_obj)
# repeat=25, number=1
# 0.0031444959999999966
# -----
# repeat=5, number=100
# 0.30228340200000003
# -----
# repeat=5, number=1000
# 3.0495730029999995
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment