Skip to content

Instantly share code, notes, and snippets.

@bobpeers
Last active May 15, 2023 23:41
Show Gist options
  • Save bobpeers/f602d32aae749b021fdb3df51128e5bd to your computer and use it in GitHub Desktop.
Save bobpeers/f602d32aae749b021fdb3df51128e5bd to your computer and use it in GitHub Desktop.
Template to schedule KNIME workflows using Prefect
#prefect imports
import prefect
from prefect import task, Flow
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.storage import Local
import pendulum
import knime
import os
# KNIME FLOW
knime_flow: str = ''
PY_SCRIPT: str = 'full_path_to_this_script.py'
FLOW_NAME: str = 'Your Flow name'
PROJECT: str = 'Operations'
KNIME_BASE = 'Path/to/your/knime/workspace/'
KNIME_FLOWS = ['Gofact Loader']
@task(max_retries=2, retry_delay=pendulum.duration(seconds=30))
def run_knime(workflow: str):
with knime.Workflow(workflow) as wf:
wf.execute()
# Set Cron to run at 5am every day
schedule = Schedule(clocks=[CronClock("0 5 * * *",start_date=pendulum.now())])
with Flow(FLOW_NAME, schedule, storage=Local(path=PY_SCRIPT, stored_as_script=True)) as flow:
logger = prefect.context.get("logger")
gofact_process = gofact()
logger.info('Downloaded GoFact files')
for f in KNIME_FLOWS:
knime_flow = os.path.join(KNIME_BASE, f)
logger.info(f'Running {knime_flow}')
run_knime(knime_flow, upstream_tasks=[gofact_process])
# Register the flow under the "tutorial" project
flow.register(project_name=PROJECT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment