Skip to content

Instantly share code, notes, and snippets.

@KrzysztofNawara
Created October 17, 2020 12:46
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Embed
What would you like to do?
import prefect
from prefect import task, Task, Flow, Parameter, unmapped
from prefect.engine.results import LocalResult
from datetime import timedelta
PIPELINE_VERSION=2
def printing_validator(cached_state, current_inputs, current_params):
logger = prefect.context.get("logger")
logger.info(f"\ncached inputs:{cached_state.cached_inputs}\ncurrent inputs: {current_inputs}")
return False
@task(cache_for=timedelta(days=1), cache_key=f"mapped1_{PIPELINE_VERSION}_0", cache_validator=printing_validator, result=LocalResult(dir="/tmp/prefect/v5", location="{task_name}_{map_index}"))
def mapped1():
print(f"executing mapped1")
return [f"m1_{x}" for x in range (0,2)]
@task(cache_for=timedelta(days=1), cache_key=f"i1_{PIPELINE_VERSION}_0", cache_validator=printing_validator, result=LocalResult(dir="/tmp/prefect/v5", location="{task_name}_{map_index}"))
def i1():
print(f"executing i1")
return "i1"
@task(cache_for=timedelta(days=1), cache_key=f"main_task_{PIPELINE_VERSION}_0", cache_validator=printing_validator, result=LocalResult(dir="/tmp/prefect/v5", location="{task_name}_{map_index}"))
def main_task(i1, mapped1):
print(f"executing main_task")
return "main_task"
with Flow(f"cache_missing_inputs_v{PIPELINE_VERSION}") as tf:
main_task.map(unmapped(i1()), mapped1())
#tf.run()
tf.register(project_name="kn_anlytics")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment