Skip to content

Instantly share code, notes, and snippets.

@masaki925
Last active August 2, 2022 06:37
Show Gist options
  • Save masaki925/01e167d392eef49c324dc070e00a8823 to your computer and use it in GitHub Desktop.
Save masaki925/01e167d392eef49c324dc070e00a8823 to your computer and use it in GitHub Desktop.
Vertex AI Pipeline minimum code
import kfp
from kfp.v2 import compiler
from google.cloud import aiplatform
from my_components import _hoge_op, _fuga_op
#
# CONSTANTS
# ------------------------------------------------------------------------------
PIPELINE_NAME = "test"
GCP_PROJECT_ID = "my-project" # Enter your GCP project ID
GCP_LOCATION = "asia-northeast1"
GCP_GCS_PIPELINE_ROOT = "my_root" # Enter your GCS bucket without gs://
GCP_GCS_PIPELINE_ROOT_PATH = f"gs://{GCP_GCS_PIPELINE_ROOT}/"
# PIPELINE
# ------------------------------------------------------------------------------
@kfp.dsl.pipeline(
name=PIPELINE_NAME,
pipeline_root=GCP_GCS_PIPELINE_ROOT_PATH,
)
def kfp_pipeline():
hoge = _hoge_op()
fuga = _fuga_op(hoge.output)
if __name__ == "__main__":
compiled_file_name = "kfp_{}.json".format(PIPELINE_NAME.replace("-", "_"))
compiler.Compiler().compile(
pipeline_func=kfp_pipeline,
package_path=compiled_file_name,
)
job = aiplatform.PipelineJob(
display_name=PIPELINE_NAME,
project=GCP_PROJECT_ID,
location=GCP_LOCATION,
template_path=compiled_file_name,
pipeline_root=GCP_GCS_PIPELINE_ROOT_PATH,
enable_caching=False,
)
job.run()
from kfp.v2.dsl import component
@component
def _hoge_op() -> str:
return "hoge"
@component
def _fuga_op(a: str) -> str:
# NOTE: Vertex AI runner has Python 3.7 (can't use f"{a=}" syntax.
print(f"a: {a}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment