https://cloud.google.com/dataproc/docs/concepts/workflows/using-workflows
-
設定環境變數
export REGION=asia-east1 export TEMPLATE_ID=workflow-template-demo export BUCKET_NAME=dataproc-workflow-demo
-
創建 workflow template
gcloud dataproc workflow-templates create $TEMPLATE_ID --region $REGION
-
綁定 cluster 至 workflow template
gcloud beta dataproc workflow-templates set-managed-cluster $TEMPLATE_ID \ --region $REGION \ --cluster-name spark-notebook \ --image-version 1.4
-
建立 job script 命名為
pyspark-job.py
import pandas as pd from pyspark.sql import SQLContext,SparkSession spark = SparkSession.builder.master("yarn").appName("demo").config("spark.sql.broadcastTimeout", "36000").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").getOrCreate() path = "gs://<<BUCKET_NAME>>/twitter.avro" a = spark.read.format("avro").load(path).count()
-
上傳 job script 至 GCS
gsutil cp pyspark-job.py gs://$BUCKET_NAME/
-
將 job 加入 workflow template 成為其中一個 step
export STEP_ID=step_pyspark gcloud dataproc workflow-templates add-job pyspark \ gs://$BUCKET_NAME/pyspark-job.py \ --step-id $STEP_ID \ --workflow-template $TEMPLATE_ID \ --region $REGION \ --properties spark.jars.packages='org.apache.spark:spark-avro_2.11:2.4.0'
-
運行 workflow template (此 command 會返回
operation-id
可以用這個 id 監控 job 的狀態或是中止 job)gcloud dataproc workflow-templates instantiate $TEMPLATE_ID --region=$REGION
-
刪除 workflow template
gcloud dataproc workflow-templates delete $TEMPLATE_ID --region=$REGION