Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created October 18, 2021 13:26
Show Gist options
  • Save anna-geller/eda4b42517793db583a1c0c8b625d6e5 to your computer and use it in GitHub Desktop.
Save anna-geller/eda4b42517793db583a1c0c8b625d6e5 to your computer and use it in GitHub Desktop.
import logging
import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import StartFlowRun
logger = logging.getLogger(__name__)
@task
def calculate_flow_end_date(end_date: str):
if end_date is not None:
return end_date
return prefect.context.get('scheduled_start_time').to_date_string()
common_flow = StartFlowRun(
flow_name='child',
project_name='community',
wait=True,
)
with Flow("flow-of-flows-with-params") as flow:
num_days_parameter = Parameter('num_days', default=1)
num_back_fill_days_parameter = Parameter('num_back_fill_days', default=1)
end_date_parameter = Parameter('end_date', default=None)
task_end_date = calculate_flow_end_date(end_date_parameter)
logger.info('Task End Date: %s', task_end_date)
logger.info('Num Days: %s', num_days_parameter)
logger.info('Num Days Backfill: %s', num_back_fill_days_parameter)
common_flow_result = common_flow(parameters={
'num_days': num_days_parameter,
'num_back_fill_days': num_back_fill_days_parameter,
'end_date': task_end_date,
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment