Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
PROJECT_ID = "avolkov-31337"
PIPELINE_ROOT = "gs://avolkov/tmp/vertex"
REGION = "us-central1"
from typing import NamedTuple
import kfp
from kfp.components import load_component_from_url, InputPath, OutputPath, create_component_from_func
from kfp.v2.google import experimental
@create_component_from_func
def print_cluster_spec_op():
import os
print("CLUSTER_SPEC:")
print(os.environ["CLUSTER_SPEC"])
print("TF_CONFIG:")
print(os.environ["TF_CONFIG"])
@kfp.dsl.pipeline(name="my-pipeline")
def my_pipeline():
print_cluster_spec_task = print_cluster_spec_op()
experimental.run_as_aiplatform_custom_job(
print_cluster_spec_task,
replica_count=5,
)
pipeline_func = my_pipeline
if __name__ == "__main__":
pipeline_file = pipeline_func.__name__ + ".json"
from kfp.v2 import compiler as compiler_v2
from kfp.v2.google.client import AIPlatformClient as Client_v2
compiler_v2.Compiler().compile(pipeline_func=pipeline_func, package_path=pipeline_file)
api_client = Client_v2(project_id=PROJECT_ID, region=REGION)
response = api_client.create_run_from_job_spec(job_spec_path=pipeline_file, pipeline_root=PIPELINE_ROOT)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment