Created
March 14, 2023 14:54
-
-
Save antonymilne/11c18b8a4105c2e1faf1481c13e2a72f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict, Any | |
from kedro.io import DataCatalog | |
from kedro.pipeline import Pipeline | |
from kedro.runner import SequentialRunner | |
class MissingOnlySequentialRunner(SequentialRunner): | |
def run( | |
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None | |
) -> Dict[str, Any]: | |
free_outputs = pipeline.outputs() - set(catalog.list()) | |
missing = {ds for ds in catalog.list() if not catalog.exists(ds)} | |
to_build = free_outputs | missing | |
to_rerun = pipeline.only_nodes_with_outputs(*to_build) + pipeline.from_inputs( | |
*to_build | |
) | |
# we also need any memory data sets that feed into that | |
# including chains of memory data sets | |
memory_sets = pipeline.data_sets() - set(catalog.list()) | |
output_to_memory = pipeline.only_nodes_with_outputs(*memory_sets) | |
input_from_memory = to_rerun.inputs() & memory_sets | |
to_rerun += output_to_memory.to_outputs(*input_from_memory) | |
return super().run(to_rerun, catalog) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment