Skip to content

Instantly share code, notes, and snippets.

@johschmidt42
Created June 23, 2023 12:59
Show Gist options
  • Save johschmidt42/bbe025d570a85cd6b60e70ba927ccc06 to your computer and use it in GitHub Desktop.
Save johschmidt42/bbe025d570a85cd6b60e70ba927ccc06 to your computer and use it in GitHub Desktop.
def create_job_run_on_new_cluster(
self,
job_id: str,
image_url: str,
package_name: str,
entry_point: str,
positional_arguments: Optional[List[str]] = None,
named_arguments: Optional[Dict[str, str]] = None,
notification_ids: Optional[List[str]] = None,
env_vars: Optional[dict] = None,
cluster_kwargs: Optional[dict] = None,
) -> dict:
"""
Create and trigger a one time job run using a new cluster.
Args:
job_id: The job id of the job run
image_url: The docker image url to use
package_name: The name of the python package to use
entry_point: The entry point of the python package to use
positional_arguments: Positional arguments for the entry_point
named_arguments: Named arguments for the python entry_point
notification_ids: The notification ids to use (system notification ids)
env_vars: Environment variables for new cluster (flat dict!)
cluster_kwargs: Cluster spark settings for a new cluster
"""
python_wheel_task_payload: dict = {
"package_name": package_name,
"entry_point": entry_point,
}
if positional_arguments:
python_wheel_task_payload["parameters"] = str(positional_arguments)
if named_arguments:
python_wheel_task_payload["parameters"] = str(named_arguments)
cluster: dict = cluster_kwargs if cluster_kwargs else self.cluster
# tags
cluster["custom_tags"] = {"ResourceClass": "SingleNode"}
# single node
cluster["spark_conf"] = {"spark.databricks.cluster.profile": "singleNode"}
if env_vars:
cluster["spark_env_vars"] = env_vars
cluster["docker_image"] = {
"url": image_url,
"basic_auth": {
"username": self.secrets_config.ACR_USERNAME,
"password": self.secrets_config.ACR_PASSWORD,
},
}
task: dict = {
"task_key": job_id,
"python_wheel_task": python_wheel_task_payload,
"new_cluster": cluster,
}
payload: dict = {"tasks": [task], "run_name": job_id}
if notification_ids:
payload["webhook_notifications"] = {
# "on_start": {"id": id for id in notification_ids},
"on_success": {"id": id for id in notification_ids},
"on_failure": {"id": id for id in notification_ids},
}
msg: str = f"Trying to create a job run. Payload: {payload}"
logger.info(msg)
headers: dict = {
"Authorization": f"Bearer {self.secrets_config.DATABRICKS_PAT}"
}
response: Response = httpx.post(
url=self.url_post_job_run, headers=headers, json=payload
)
try:
data: dict = response.json()
except JSONDecodeError:
data: dict = {"detail": response.text}
if response.status_code == 200:
return data
else:
msg: str = (
f"Could not create a job run: {data} "
f"URL: {self.url_post_job_run} "
f"HTTP Status Code: {response.status_code}"
)
logger.error(msg)
raise ValueError(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment