Created
February 27, 2021 22:40
-
-
Save noxasaxon/ca78d8dc60e2057d1551eaf21145539a to your computer and use it in GitHub Desktop.
testing parameter resolver auto run jinja
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import timeit | |
# root_object = { | |
# "artifacts": { | |
# "event": { | |
# "details": { | |
# "firstname": "Sterling", | |
# "middlename": "Malory", | |
# "lastname": "Archer", | |
# "vault_test" : "vault:socless_vault_tests.txt" | |
# } | |
# } | |
# } | |
# } | |
# vault_pattern = re.compile(r"(vault:)(\S+(?=\s|$))(.*)") | |
import_module = """ | |
import re | |
VAULT_TOKEN = "vault:" | |
PATH_TOKEN = "$." | |
CONVERSION_TOKEN = "!" | |
from typing import Any | |
import json | |
from jinja2.nativetypes import NativeEnvironment | |
from jinja2 import select_autoescape, StrictUndefined | |
def fetch_from_vault(string_thing, content_only=True): | |
return string_thing | |
# Jinja Environment Configuration | |
#! this fails to escape <script>, escaping works with Environment | |
jinja_env = NativeEnvironment( | |
autoescape=select_autoescape( | |
["html", "xml"], default_for_string=True, default=True | |
), | |
variable_start_string="{", # this defines the start tokens for a jinja template | |
variable_end_string="}", # this is the end token for a jinja template | |
undefined=StrictUndefined, # This configures Jinjas behaviour when a template user provides an undefined reference | |
### StrictUndefined here ensures that if the user references something that | |
# Doesn't actually exist in the context, an error is raised | |
# Without StrictUndefined, invalid references fail silently | |
# More on undefined types here https://jinja.palletsprojects.com/en/2.11.x/api/#undefined-types | |
) | |
# Define Custom Filters | |
def maptostr(target_list): | |
return [str(each) for each in target_list] | |
def vault(vault_id: str): | |
# A custom jinja Function which returns the content of a socless vault | |
# we expect it to be called as {vault( context.vault_id) } and return the same value | |
# that current vault:vault-id would return | |
return fetch_from_vault(vault_id, content_only=True) | |
def fromjson(string_json: str) -> Any: | |
# This is a custom jinja Filter which expects stringified json and returns | |
# the output of calling json.loads on it. | |
return json.loads(string_json) | |
# Add Custom Functions | |
custom_functions = {"vault": vault, "fromjson": fromjson} | |
# Add Custom Filters | |
custom_filters = {"maptostr": maptostr, **custom_functions} | |
# Register Custom Filters | |
jinja_env.filters.update(custom_filters) | |
# Register Custom Functions | |
jinja_env.globals.update(custom_functions) | |
def render_jinja_from_string(template_string: str, root_object: dict) -> Any: | |
template_obj = jinja_env.from_string(template_string) | |
return template_obj.render(context=root_object) | |
############################################################################ | |
class OLDParameterResolver: | |
def __init__(self, root_obj): | |
self.root_obj = root_obj | |
def resolve_jsonpath(self, path): | |
_pre, _sep, post = path.partition(PATH_TOKEN) | |
keys = post.split(".") | |
obj_copy = self.root_obj.copy() | |
for key in keys: | |
try: | |
value = obj_copy.get(key) | |
except AttributeError: | |
raise SoclessException( | |
f"Unable to resolve key {key}, parent object does not exist. Full path: {path}" | |
) | |
if isinstance(value, str) and value.startswith(VAULT_TOKEN): | |
actual = self.resolve_vault_path(value) | |
else: | |
actual = value | |
obj_copy = actual | |
return obj_copy | |
def resolve_vault_path(self, path): | |
_, __, file_id = path.partition(VAULT_TOKEN) | |
data = fetch_from_vault(file_id, content_only=True) | |
return data | |
def resolve_reference(self, reference_path): | |
if not isinstance(reference_path, str): | |
if isinstance(reference_path, dict): | |
resolved_dict = {} | |
for key, value in list(reference_path.items()): | |
resolved_dict[key] = self.resolve_reference(value) | |
return resolved_dict | |
elif isinstance(reference_path, list): | |
resolved_list = [] | |
for item in reference_path: | |
resolved_list.append(self.resolve_reference(item)) | |
return resolved_list | |
else: | |
return reference_path | |
if reference_path.startswith(PATH_TOKEN): | |
reference, _, conversion = reference_path.partition(CONVERSION_TOKEN) | |
resolved = self.resolve_jsonpath(reference) | |
elif reference_path.startswith(VAULT_TOKEN): | |
reference, _, conversion = reference_path.partition(CONVERSION_TOKEN) | |
resolved = self.resolve_vault_path(reference) | |
else: | |
return reference_path | |
if conversion: | |
resolved = self.apply_conversion_from(resolved, conversion) | |
return resolved | |
def resolve_parameters(self, parameters): | |
actual_params = {} | |
for parameter, reference in list(parameters.items()): | |
actual_params[parameter] = self.resolve_reference(reference) | |
return actual_params | |
def apply_conversion_from(self, data, conversion): | |
print(data) | |
print(conversion) | |
if conversion == "json": | |
return json.loads(data) | |
class SoclessException(Exception): | |
pass | |
class ParameterResolver: | |
def __init__(self, root_obj): | |
self.root_obj = root_obj | |
def resolve_reference(self, reference_path): | |
if isinstance(reference_path, str): | |
return resolve_string_parameter(reference_path, self.root_obj) | |
elif isinstance(reference_path, dict): | |
resolved_dict = {} | |
for key, value in list(reference_path.items()): | |
resolved_dict[key] = self.resolve_reference(value) | |
return resolved_dict | |
elif isinstance(reference_path, list): | |
resolved_list = [] | |
for item in reference_path: | |
resolved_list.append(self.resolve_reference(item)) | |
return resolved_list | |
else: | |
return reference_path | |
def resolve_parameters(self, parameters): | |
actual_params = {} | |
for parameter, reference in parameters.items(): | |
actual_params[parameter] = self.resolve_reference(reference) | |
return actual_params | |
def convert_deprecated_vault_to_template(vault_reference) -> str: | |
reference, _, conversion = vault_reference.partition(CONVERSION_TOKEN) | |
_, _, file_id = reference.partition(VAULT_TOKEN) | |
template = f"vault('{file_id}')" | |
if conversion: | |
template = template + " |fromjson" | |
return "{" + template + "}" | |
def convert_legacy_reference_to_template(reference_path: str) -> str: | |
try: | |
template = reference_path | |
if template.startswith(PATH_TOKEN): | |
_, _, conversion = template.partition(CONVERSION_TOKEN) | |
template = f"context{template[1:]}" | |
if conversion: | |
template = template + " |fromjson" | |
template = "{" + template + "}" | |
elif template.startswith(VAULT_TOKEN): | |
template = convert_deprecated_vault_to_template(template) | |
return template | |
except (TypeError, KeyError) as e: | |
raise SoclessException( | |
f"Unable to convert reference type {type(reference_path)} to template - {e}" | |
) | |
def resolve_string_parameter(parameter: str, root_object: dict) -> Any: | |
template = convert_legacy_reference_to_template(parameter) | |
resolved = render_jinja_from_string(template, root_object) | |
if isinstance(resolved, str): | |
# if jsonpath renders into something with a vault_token, it needs to run through jinja again | |
if resolved.startswith(VAULT_TOKEN): | |
new_template_string = convert_deprecated_vault_to_template(resolved) | |
return render_jinja_from_string(new_template_string, root_object) | |
## autoescaping is currently disabled, this line may not be necessary | |
# resolved = resolved.replace(""", '"').replace("'", "'") | |
return resolved | |
###################_BEGIN_SETUP_################## | |
root_obj = { | |
"artifacts": { | |
"event": { | |
"details": { | |
"firstname": "Sterling", | |
"middlename": "Malory", | |
"lastname": "Archer", | |
"vault_test" : "vault:socless_vault_tests.txt" | |
} | |
} | |
} | |
} | |
parameters = { | |
"firstname": "$.artifacts.event.details.firstname", | |
"lastname": "$.artifacts.event.details.lastname", | |
"middlename": "Malory", | |
"vault.txt": "vault:socless_vault_tests.txt", | |
# "vault.json": "vault:socless_vault_tests.json!json", | |
"acquaintances": [ | |
{ | |
"firstname": "$.artifacts.event.details.middlename", | |
"lastname": "$.artifacts.event.details.lastname" | |
} | |
] | |
} | |
""" | |
testcode = """ | |
resolver = OLDParameterResolver(root_obj) | |
output = resolver.resolve_parameters(parameters) | |
""" | |
print(min(timeit.repeat(stmt=testcode, setup=import_module, repeat=25, number=1))) | |
## resolver = OLDParameterResolver(root_obj) | |
# repeat=25, number=1 | |
# 0.000019248000000082754 | |
# ----- | |
# repeat=2, number=10000 | |
# 0.12176976900000003 | |
# ----- | |
# repeat=5, number=100 | |
# .0014764200000000338 | |
# ----- | |
# repeat=5, number=1000 | |
# 0.012429356000000003 | |
# ___________________ | |
## resolver = ParameterResolver(root_obj) | |
# repeat=25, number=1 | |
# 0.0031444959999999966 | |
# ----- | |
# repeat=5, number=100 | |
# 0.30228340200000003 | |
# ----- | |
# repeat=5, number=1000 | |
# 3.0495730029999995 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment