Skip to content

Instantly share code, notes, and snippets.

@cicdw
Last active April 6, 2020 20:59
Show Gist options
  • Save cicdw/0f1cf6ad6c8d0723b83bf8545866a46d to your computer and use it in GitHub Desktop.
Save cicdw/0f1cf6ad6c8d0723b83bf8545866a46d to your computer and use it in GitHub Desktop.
Proposed Result interface
"""
The new Prefect Result interface is a mechanism for interacting with / retrieving data from possibly third party locations
in a secure manner. Results also provide convenient hooks for caching.
"""
class Result:
def __init__(self, **kwargs):
"""
Initialize the class.
"""
def hydrate(self, result: Result) -> "Result":
"""
This should be the "hydration" step that we should call as early as possible with the task's .result attribute
This method should pass `self.filepath` to result.read() and return a newly hydrated result object.
"""
pass
def read(self, loc: str = None) -> "Result":
"""
Reads data from the "remote" location; if loc is provided, will become the .filename / .filepath attribute for a new
result class. If not provided, the current attribute is used and self is returned.
"""
pass
def write(self, value: Any, **kwargs) -> "Result":
"""
This method takes two arguments:
- value: the value to be written
- **kwargs: values used to format the filename template for this result
Should return a _new_ result object with the appropriately formatted filepath.
"""
pass
def validate(self) -> bool:
"""
Run any validator functions associated with this result and return whether the result is valid or not.
All individual validator functions must return True for this method to return True.
Emits a warning log if run_validators isn't true, and proceeds to run validation functions anyway.
Returns:
- bool: whether or not the Result passed all validation functions
"""
pass
def copy(self) -> "Result":
"""
Return a copy of the current result object.
"""
return copy.copy(self)
def serialize(self) -> dict:
"""
Serializes the result value into Cloud JSON representation.
Returns:
- dict: the serialized result value
"""
pass
@classmethod
def deserialize(cls, self) -> bytes:
"""
Deserializes a JSON representation of a Result class into a real result class using our schemas.
Returns:
- Result: the deserialized result value
"""
pass
def serialize_to_bytes(self) -> bytes:
"""
Serializes the result value into bytes.
Returns:
- bytes: the serialized result value
"""
return base64.b64encode(cloudpickle.dumps(self.value))
@classmethod
def deserialize_from_bytes(cls, serialized_value: Union[str, bytes]) -> Any:
"""
Takes a given serialized result value and returns a deserialized value.
Args:
- serialized_value (str): The serialized result value
Returns:
- Any: the deserialized result value
"""
return cloudpickle.loads(base64.b64decode(serialized_value))
def format(self, **kwargs: Any) -> "Result":
"""
Takes a set of string format key-value pairs and renders the result.filepath_template to a final filepath string
Args:
- **kwargs (Any): string format arguments for result.filepath_template
Returns:
- Any: the current result instance
"""
pass
@classmethod
def exists(cls, self) -> bool:
"""
Checks whether the target result exists.
Does not validate whether the result is `valid`, only that it is present.
Returns:
- bool: whether or not the target result exists.
"""
raise NotImplementedError()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment