Skip to content

Instantly share code, notes, and snippets.

@nfx
Last active October 13, 2021 18:35
Show Gist options
  • Save nfx/473e07d6aff0ffb84b8fc8c2acf537e8 to your computer and use it in GitHub Desktop.
Save nfx/473e07d6aff0ffb84b8fc8c2acf537e8 to your computer and use it in GitHub Desktop.
databricks_job_task resource WIP
diff --git a/compute/model.go b/compute/model.go
index 1a2b43a2..b71afb6a 100644
--- a/compute/model.go
+++ b/compute/model.go
@@ -612,6 +612,21 @@ func (j Job) ID() string {
return fmt.Sprintf("%d", j.JobID)
}
+func (j Job) hasTask(taskKey string) bool {
+ if j.Settings == nil {
+ return false
+ }
+ if !j.Settings.isMultiTask() {
+ return false
+ }
+ for _, task := range j.Settings.Tasks {
+ if task.TaskKey == taskKey {
+ return true
+ }
+ }
+ return false
+}
+
// RunParameters ...
type RunParameters struct {
// a shortcut field to reuse this type for RunNow
diff --git a/provider/provider.go b/provider/provider.go
index 0f7b4f9e..eae77bd5 100644
--- a/provider/provider.go
+++ b/provider/provider.go
@@ -51,6 +51,7 @@ func DatabricksProvider() *schema.Provider {
"databricks_cluster_policy": compute.ResourceClusterPolicy(),
"databricks_instance_pool": compute.ResourceInstancePool(),
"databricks_job": compute.ResourceJob(),
+ "databricks_job_task": compute.ResourceJobTask(),
"databricks_pipeline": compute.ResourcePipeline(),
"databricks_group": identity.ResourceGroup(),
package compute
import (
"context"
"fmt"
"github.com/databrickslabs/terraform-provider-databricks/common"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)
func ResourceJobTask() *schema.Resource {
// TODO: add mutex on job creation
taskSchema := common.StructToSchema(JobTaskSettings{}, func(t map[string]*schema.Schema) map[string]*schema.Schema {
jobSettingsSchema(&t, "")
t["job_id"] = &schema.Schema{
Type: schema.TypeString,
Required: true,
}
// depends_on is Terraform reserved keyword
t["depends"] = t["depends_on"]
delete(t, "depends_on")
return t
})
p := common.NewPairID("job_id", "task_key")
return common.Resource{
Schema: taskSchema,
Create: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
jobID := d.Get("job_id").(string)
jobsAPI := NewJobsAPI(ctx, c)
job, err := jobsAPI.Read(jobID)
if err != nil {
return fmt.Errorf("cannot load job %s: %w", jobID, err)
}
var task JobTaskSettings
err = common.DataToStructPointer(d, taskSchema, &task)
if err != nil {
return err
}
if job.hasTask(task.TaskKey) {
return fmt.Errorf("Job '%s' already has '%s' task", job.Settings.Name, task.TaskKey)
}
job.Settings.Tasks = append(job.Settings.Tasks, task)
err = jobsAPI.Update(jobID, *job.Settings)
if err != nil {
return fmt.Errorf("cannot update job %s: %w", jobID, err)
}
p.Pack(d)
return nil
},
Read: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
jobID, taskKey, err := p.Unpack(d)
if err != nil {
return err
}
jobsAPI := NewJobsAPI(ctx, c)
job, err := jobsAPI.Read(jobID)
if err != nil {
return err
}
if !job.Settings.isMultiTask() {
return fmt.Errorf("Job '%s' is of %s format", job.Settings.Name, job.Settings.Format)
}
for _, task := range job.Settings.Tasks {
if task.TaskKey == taskKey {
return common.StructToData(task, taskSchema, d)
}
}
return common.NotFound(fmt.Sprintf("Task %s not found", taskKey))
},
Delete: func(ctx context.Context, d *schema.ResourceData, c *common.DatabricksClient) error {
jobID, taskKey, err := p.Unpack(d)
if err != nil {
return err
}
jobsAPI := NewJobsAPI(ctx, c)
job, err := jobsAPI.Read(jobID)
if err != nil {
return err
}
if job.hasTask(taskKey) {
return common.NotFound(fmt.Sprintf("Task %s not found", taskKey))
}
newTasks := []JobTaskSettings{}
for _, task := range job.Settings.Tasks {
if task.TaskKey == taskKey {
continue
}
newTasks = append(newTasks, task)
}
job.Settings.Tasks = newTasks
return jobsAPI.Update(jobID, *job.Settings)
},
}.ToResource()
}
terraform {
required_providers {
databricks = {
source = "databrickslabs/databricks"
}
}
}
provider "databricks" {
}
resource "databricks_job" "this" {
name = "Terraform Demo (placeholder)"
// need an API to create a placeholder job, otherwise subsequent diffs override changes of each other
format = "MULTI_TASK"
}
resource "databricks_job_task" "a" {
job_id = databricks_job.this.id
task_key = "a"
new_cluster {
num_workers = 8
spark_version = "9.1.x-scala2.12"
node_type_id = "Standard_F4s"
}
notebook_task {
notebook_path = "/Users/serge.smertin@databricks.com/Fourth"
}
}
resource "databricks_job_task" "b" {
job_id = databricks_job.this.id
task_key = "b"
depends {
task_key = databricks_job_task.a.task_key
}
existing_cluster_id = "x"
notebook_task {
notebook_path = "/Users/serge.smertin@databricks.com/Fifth"
}
}
output "job_url" {
value = "${databricks_job.this.url}/tasks"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment