Skip to content

Instantly share code, notes, and snippets.

@antonymilne
Created March 14, 2023 14:54
Show Gist options
  • Save antonymilne/11c18b8a4105c2e1faf1481c13e2a72f to your computer and use it in GitHub Desktop.
Save antonymilne/11c18b8a4105c2e1faf1481c13e2a72f to your computer and use it in GitHub Desktop.
from typing import Dict, Any
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from kedro.runner import SequentialRunner
class MissingOnlySequentialRunner(SequentialRunner):
def run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
) -> Dict[str, Any]:
free_outputs = pipeline.outputs() - set(catalog.list())
missing = {ds for ds in catalog.list() if not catalog.exists(ds)}
to_build = free_outputs | missing
to_rerun = pipeline.only_nodes_with_outputs(*to_build) + pipeline.from_inputs(
*to_build
)
# we also need any memory data sets that feed into that
# including chains of memory data sets
memory_sets = pipeline.data_sets() - set(catalog.list())
output_to_memory = pipeline.only_nodes_with_outputs(*memory_sets)
input_from_memory = to_rerun.inputs() & memory_sets
to_rerun += output_to_memory.to_outputs(*input_from_memory)
return super().run(to_rerun, catalog)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment