Skip to content

Instantly share code, notes, and snippets.

@schlarpc
Created December 31, 2023 08:00
Show Gist options
  • Save schlarpc/5d66b1d9aa4abd30dca974de976116be to your computer and use it in GitHub Desktop.
Save schlarpc/5d66b1d9aa4abd30dca974de976116be to your computer and use it in GitHub Desktop.
work-in-progress encoding of AWS Step Functions states language as python
import ast
import inspect
import hashlib
import json
import base64
import random
import re
import itertools
import time
import uuid
import contextlib
import datetime
from typing import TypeVar, TypeAlias, Iterable, Optional, Union, Mapping, Sequence, overload
JSON: TypeAlias = Union["JSONObject", "JSONArray", "JSONNumber", str, bool, None]
JSONNumber: TypeAlias = int | float
JSONObject: TypeAlias = dict[str, JSON]
JSONArray: TypeAlias = list[JSON]
class StatesFlow:
@overload
def wait(self, seconds: JSONNumber, /):
...
@overload
def wait(self, timestamp: datetime.datetime, /):
...
def wait(self, value: Union[JSONNumber, datetime.datetime], /):
# TODO support injecting time
if isinstance(value, (int, float)):
delta = value
elif isinstance(value, datetime.datetime):
delta = (value - datetime.datetime.utcnow()).total_seconds()
time.sleep(int(delta))
def succeed(self):
raise NotImplementedError()
def fail(self, *, cause: Optional[str] = None, error: Optional[str] = None):
raise NotImplementedError()
def map(self):
raise NotImplementedError()
@contextlib.contextmanager
def parallel(self):
raise NotImplementedError()
class StatesIntrinsics:
def array(self, *args: JSON) -> list[JSON]:
return [*args]
def array_partition(self, array: list[JSON], chunk_size: JSONNumber, /) -> list[list[JSON]]:
if chunk_size < 1:
raise ValueError()
return [
array[start_index:start_index+int(chunk_size)]
for start_index in range(0, len(array), int(chunk_size))
]
def array_contains(self, array: list[JSON], value: JSON) -> bool:
return value in array
def array_range(self, first: JSONNumber, last: JSONNumber, increment: JSONNumber, /) -> list[int]:
return list(range(int(first), int(last + 1), int(increment)))
def array_get_item(self, array: list[JSON], index: JSONNumber, /) -> JSON:
return array[int(index)]
def array_length(self, array: list[JSON]) -> int:
return len(array)
def _array_unique_generator(self, array: list[JSON]) -> Iterable[JSON]:
seen: set[str] = set()
for value in array:
hashable = json.dumps(value, sort_keys=True)
if hashable not in seen:
yield value
def array_unique(self, array: list[JSON], /) -> list[JSON]:
# does not return in same order as step functions
return list(self._array_unique_generator(array))
def base64_encode(self, value: str, /) -> str:
return base64.b64encode(value.encode("utf-8")).decode('ascii')
def base64_decode(self, value: str, /) -> str:
normalized = value.replace('+', '-').replace('/', '_')
cleaned = re.sub(r'[^A-Za-z0-9_-]', '', normalized)
return base64.urlsafe_b64decode(cleaned).decode('utf-8', errors='replace')
def hash(self, data: str, algorithm: str, /) -> str:
algorithms = {
"MD5": hashlib.md5,
"SHA-1": hashlib.sha1,
"SHA-256": hashlib.sha256,
"SHA-384": hashlib.sha384,
"SHA-512": hashlib.sha512,
}
if algorithm not in algorithms:
raise ValueError()
return algorithms[algorithm](data.encode("utf-8")).hexdigest()
def json_merge(self, object1: JSONObject, object2: JSONObject, deep_merge: JSON = False, /) -> JSONObject:
# deep_merge has no type validation, and only true raises as unimplemented
if deep_merge is True:
raise ValueError()
return {**object1, **object2}
def string_to_json(self, value: str) -> JSON:
# returns null if input string is just whitespace
if not value.strip():
return None
return json.loads(value)
def math_random(self, start: JSONNumber, end: JSONNumber, seed: Optional[JSONNumber]=None, /) -> int:
rng = random.Random()
# same seed does not return same results as step functions (different algorithm)
if seed is not None:
rng.seed(int(seed))
return rng.randint(int(start), int(end))
def math_add(self, value1: JSONNumber, value2: JSONNumber, /) -> int:
return int(value1) + int(value2)
def string_split(self, input_string: str, splitter: str, /) -> list[str]:
parts = re.split("|".join(re.escape(c) for c in splitter), input_string)
# empty strings are omitted from the result
return list(filter(None, parts))
def uuid(self, /) -> str:
return str(uuid.uuid4()).lower()
def _format_arg_to_string(self, arg: JSON) -> str:
if isinstance(arg, (int, float, bool)) or arg is None:
return json.dumps(arg)
elif isinstance(arg, str):
return arg
elif isinstance(arg, list):
# yes, this means that dicts render differently when nested inside a list. it's weird.
return json.dumps(arg, separators=(",", ":"))
elif isinstance(arg, dict):
pairs = (
self._format_arg_to_string(k) + '=' + self._format_arg_to_string(v)
for k, v in arg.items()
)
return '{' + ', '.join(pairs) + '}'
raise NotImplementedError()
def _format_template_compose(self, template: str, formatted_args: list[str]) -> str:
buffer = []
escape_active = False
interp_active = False
# split template into "control characters" and "spans of non-control characters"
# the control characters are processed by the state machine loop, while spans pass through
parts = (match.group() for match in re.finditer(r'[^\\{}]+|.', template))
for part in parts:
if escape_active:
buffer.append(part)
escape_active = False
elif part == "\\":
escape_active = True
elif part == "}":
if not interp_active:
raise ValueError("matching '{' not found for '}'.")
if not formatted_args:
raise ValueError("number of arguments do not match the occurrences of {}")
buffer.append(formatted_args[0])
formatted_args = formatted_args[1:]
interp_active = False
elif interp_active:
raise ValueError("matching '}' not found for '{'")
elif part == "{":
interp_active = True
else:
buffer.append(part)
if interp_active:
raise ValueError("matching '}' not found for '{'")
if formatted_args:
raise ValueError("number of arguments do not match the occurrences of {}")
return "".join(buffer)
def format(self, template: str, /, *args: JSON) -> str:
return self._format_template_compose(template, [self._format_arg_to_string(arg) for arg in args])
a = StatesIntrinsics().format("hello fat\\{{}\\}her{", 1)
print(json.dumps(a))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment