Skip to content

Instantly share code, notes, and snippets.

@royerk
Created August 5, 2022 21:12
Show Gist options
  • Save royerk/a5b8c3488af93bcdd3a8520b24567982 to your computer and use it in GitHub Desktop.
Save royerk/a5b8c3488af93bcdd3a8520b24567982 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.decorators import task
from google.cloud import aiplatform
ENV = "my-env-name"
PIPELINE_SPEC_URI = f"gs://bucket/something/something/pipeline-spec-{ENV}.json"
PROJECT_ID = f"my-project-id-{ENV}"
REGION = "moon-southpole1"
PIPELINE_ROOT = f"gs://some-bucket/"
dag_name = f"some-name-{ENV}"
dag_description = "Submit Vertex AI pipeline"
dag_schedule = "your-5-stars-cron-here"
default_args = {
"start_date": ...,
"retries": ...,
"retry_delay": ...,
}
with DAG(
dag_id=dag_name,
default_args=default_args,
description=dag_description,
schedule_interval=dag_schedule,
dagrun_timeout=timedelta(hours=3),
max_active_runs=1,
concurrency=1,
) as dag:
@task(task_id="Submit-my-pipeline")
def submit_pipeline(**kwargs):
aiplatform.init(
project=PROJECT_ID,
location=REGION,
)
job = aiplatform.PipelineJob(
display_name=f"scheduled-run-of-my-pipeline-{ENV}",
template_path=PIPELINE_SPEC_URI,
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
parameter_values={"env": ENV},
)
job.submit()
return "Job submitted"
run_this = submit_pipeline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment