Created
June 23, 2023 12:59
-
-
Save johschmidt42/bbe025d570a85cd6b60e70ba927ccc06 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def create_job_run_on_new_cluster( | |
self, | |
job_id: str, | |
image_url: str, | |
package_name: str, | |
entry_point: str, | |
positional_arguments: Optional[List[str]] = None, | |
named_arguments: Optional[Dict[str, str]] = None, | |
notification_ids: Optional[List[str]] = None, | |
env_vars: Optional[dict] = None, | |
cluster_kwargs: Optional[dict] = None, | |
) -> dict: | |
""" | |
Create and trigger a one time job run using a new cluster. | |
Args: | |
job_id: The job id of the job run | |
image_url: The docker image url to use | |
package_name: The name of the python package to use | |
entry_point: The entry point of the python package to use | |
positional_arguments: Positional arguments for the entry_point | |
named_arguments: Named arguments for the python entry_point | |
notification_ids: The notification ids to use (system notification ids) | |
env_vars: Environment variables for new cluster (flat dict!) | |
cluster_kwargs: Cluster spark settings for a new cluster | |
""" | |
python_wheel_task_payload: dict = { | |
"package_name": package_name, | |
"entry_point": entry_point, | |
} | |
if positional_arguments: | |
python_wheel_task_payload["parameters"] = str(positional_arguments) | |
if named_arguments: | |
python_wheel_task_payload["parameters"] = str(named_arguments) | |
cluster: dict = cluster_kwargs if cluster_kwargs else self.cluster | |
# tags | |
cluster["custom_tags"] = {"ResourceClass": "SingleNode"} | |
# single node | |
cluster["spark_conf"] = {"spark.databricks.cluster.profile": "singleNode"} | |
if env_vars: | |
cluster["spark_env_vars"] = env_vars | |
cluster["docker_image"] = { | |
"url": image_url, | |
"basic_auth": { | |
"username": self.secrets_config.ACR_USERNAME, | |
"password": self.secrets_config.ACR_PASSWORD, | |
}, | |
} | |
task: dict = { | |
"task_key": job_id, | |
"python_wheel_task": python_wheel_task_payload, | |
"new_cluster": cluster, | |
} | |
payload: dict = {"tasks": [task], "run_name": job_id} | |
if notification_ids: | |
payload["webhook_notifications"] = { | |
# "on_start": {"id": id for id in notification_ids}, | |
"on_success": {"id": id for id in notification_ids}, | |
"on_failure": {"id": id for id in notification_ids}, | |
} | |
msg: str = f"Trying to create a job run. Payload: {payload}" | |
logger.info(msg) | |
headers: dict = { | |
"Authorization": f"Bearer {self.secrets_config.DATABRICKS_PAT}" | |
} | |
response: Response = httpx.post( | |
url=self.url_post_job_run, headers=headers, json=payload | |
) | |
try: | |
data: dict = response.json() | |
except JSONDecodeError: | |
data: dict = {"detail": response.text} | |
if response.status_code == 200: | |
return data | |
else: | |
msg: str = ( | |
f"Could not create a job run: {data} " | |
f"URL: {self.url_post_job_run} " | |
f"HTTP Status Code: {response.status_code}" | |
) | |
logger.error(msg) | |
raise ValueError(msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment