Skip to content

Instantly share code, notes, and snippets.

@mzjp2

mzjp2/skip_hook.py

Created Oct 1, 2020
Embed
What would you like to do?
A prototype of a hook that will skip a node run based on the last modified time of input and output datasets
import logging
from kedro.framework.hooks import hook_impl
from kedro.io import MemoryDataSet
from kedro.io.core import get_filepath_str
class SkipHook:
def __init__(self):
self.logger = logging.getLogger(__name__)
@hook_impl
def before_node_run(self, node, catalog):
outputs = [getattr(catalog.datasets, output) for output in node.outputs]
inputs = [
getattr(catalog.datasets, input_)
for input_ in node.inputs
if input_ != "parameters" and not input_.startswith("params:")
]
if any(isinstance(output, MemoryDataSet) for output in outputs) or any(
isinstance(input_, MemoryDataSet) for input_ in inputs
):
return
if not outputs:
return
output_dtimes = []
for output in outputs:
output_path = get_filepath_str(output._get_save_path(), output._protocol)
try:
output_dtimes.append(output._fs.modified(output_path))
except FileNotFoundError:
return
input_dtimes = []
for input_ in inputs:
input_path = get_filepath_str(input_._get_load_path(), input_._protocol)
try:
input_dtimes.append(output._fs.modified(input_path))
except FileNotFoundError:
return
latest_input = sorted(input_dtimes)[-1]
oldest_output = sorted(output_dtimes)[0]
if latest_input >= oldest_output:
return
def _fake_run(*args, **kwargs):
"""A fake node run method"""
self.logger.info(f"Skip node {node.name}")
return {}
node.run = _fake_run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment