Skip to content

Instantly share code, notes, and snippets.

@lordlinus
Created June 10, 2021 08:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lordlinus/21c53f2fad9c63fe29c328599383f992 to your computer and use it in GitHub Desktop.
Save lordlinus/21c53f2fad9c63fe29c328599383f992 to your computer and use it in GitHub Desktop.
synapse spark job submit via REST api
#!/usr/bin/env python3
import os
import logging
from subprocess import run, PIPE
import requests
import json
import time
from datetime import datetime
now_file = f"{datetime.utcnow().strftime('%X').replace(':','-')}.log"
print(f"Log Filename: {now_file}")
# logging.basicConfig(filename=now_file,format="%(asctime)s:%(levelname)s:%(message)s")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(now_file),
logging.StreamHandler()
]
)
# logging.info("This should be logged")
synapse_wksp = "synapse-wksp-01"
spark_pool = "sparkpool01"
synapse_wksp_stg = "synapsestorage"
synapse_wksp_container = "containerstorage"
batch_job_dir = f"abfss://{synapse_wksp_container}@{synapse_wksp_stg}.dfs.core.windows.net/synapse/workspaces/{synapse_wksp}/batchjobs/SparkJobDef2"
synapse_endpoint = f"https://{synapse_wksp}.dev.azuresynapse.net"
spark_batch_job_url = f"{synapse_endpoint}/livyApi/versions/2019-11-01-preview/sparkPools/{spark_pool}/batches"
workload_1 = {"wl1": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]}
workload_2 = { "wl2": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]}
workload_3 = {"wl3": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]}
workload_4 = {"wl4": ["step01.py", "step02.py", "step03.py", "step04.py", "step05.py", "step06.py", "summary.py"]}
wls = [workload_1,workload_4,workload_2,workload_3]
# Get bearer token
def get_token():
az_command = ['az', 'account', 'get-access-token', '--resource',
'https://dev.azuresynapse.net', '--query', 'accessToken']
result = run(az_command, stdout=PIPE, stderr=PIPE, universal_newlines=True)
bearer_token = result.stdout.strip().strip('""')
auth_header = {"Authorization": f"Bearer {bearer_token}"}
return auth_header
auth_header = get_token()
wl_3_job_4_conf_01 = {
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "32",
"spark.dynamicAllocation.maxExecutors": "186",
"spark.dynamicAllocation.executorIdleTimeout": "1800s",
"spark.sql.shuffle.partitions":"15000",
"spark.task.cpus":"2"
}
wl_3_job_5_conf_01 = {
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "32",
"spark.dynamicAllocation.maxExecutors": "186",
"spark.dynamicAllocation.executorIdleTimeout": "1800s",
"spark.sql.shuffle.partitions":"5000",
"spark.task.cpus":"2",
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.skewJoin.enabled": "true"
}
default_conf = {
"spark.dynamicAllocation.enabled": "true",
"spark.dynamicAllocation.minExecutors": "32",
"spark.dynamicAllocation.maxExecutors": "186",
"spark.dynamicAllocation.executorIdleTimeout": "1800s",
"spark.sql.shuffle.partitions":"500"
}
payload = {
"properties": {
"targetBigDataPool": {
"referenceName": spark_pool,
"type": "BigDataPoolReference"
},
"requiredSparkVersion": "3.0",
"language": "python",
"jobProperties": {
"name": None,
"file": None,
"conf": None,
"numExecutors": 32,
"executorCores": 16,
"executorMemory": "112g",
"driverCores": 16,
"driverMemory": "112g"
}
}
}
def create_spark_job(app_name,stg_file_path,conf):
spark_job_create_url = f"{synapse_endpoint}/sparkJobDefinitions/{app_name}"
payload['properties']['jobProperties']['name'] = app_name
payload['properties']['jobProperties']['file'] = stg_file_path
payload['properties']['jobProperties']['conf'] = conf
# Create job should be done once before executing the the jobs so may not need auth tokens
# renewed very frequently
# auth_header = get_token()
# r = requests.delete(spark_job_create_url,params={"api-version":"2019-06-01-preview"},headers=auth_header) # Delete the job to ensure there is no old config
# print(r.json())
time.sleep(1)
r = requests.put(spark_job_create_url,params={"api-version":"2019-06-01-preview"},json=payload,headers=auth_header)
response = r.json()
logging.info(f"submit_response_status: {r.status_code}\nsubmit_response: {r.json()}")
def execute_spark_job(app_name):
while True:
try:
auth_header = get_token()
spark_job_submit_url = f"{synapse_endpoint}/sparkJobDefinitions/{app_name}/execute"
submit_response = requests.post(spark_job_submit_url,params={"api-version":"2019-06-01-preview"},headers=auth_header)
if (submit_response.status_code == 202):
livy_id = submit_response.json()['id']
return livy_id
break
else:
logging.error(f"Error submitting job: {submit_response.json()}")
time.sleep(2)
except:
pass
# create Spark Batch Job
for wl in wls:
logging.info(f"{wl} - Start time: {datetime.utcnow()}")
for dir,files_list in wl.items():
for file_name in files_list:
logging.info(f"File_name: {file_name}")
if (dir == "wl3" and file_name == 'step04.py'):
conf = wl_3_job_4_conf_01
elif (dir == "wl3" and file_name == 'step05.py'):
conf = wl_3_job_5_conf_01
else:
conf = default_conf
app_name = file_name.split('.')[0]+"_dynamic" # Use static for 2000 cores
stg_file_path = f"{batch_job_dir}/{dir}/{file_name}" # construct the PySpark file to submit
# create_spark_job(app_name,stg_file_path,conf) # Create the Spark Job Definition first
livy_id = execute_spark_job(app_name) # This job def should already exist
logging.info(f"Livy Id:{livy_id}")
while True:
try:
spark_batch_job_url_status = f"{spark_batch_job_url}/{livy_id}"
auth_header = get_token()
r = requests.get(spark_batch_job_url_status, params={"detailed":True}, headers=auth_header)
r_json = r.json()
state,result = r_json["state"],r_json["result"]
if (state == "success" and result == "Succeeded"):
appId = r_json['appId']
appName = r_json['livyInfo']['jobCreationRequest']['name']
submittedAt,scheduledAt,endedAt = r_json['schedulerInfo']['submittedAt'],r_json['schedulerInfo']['scheduledAt'],r_json['schedulerInfo']['endedAt']
app_result = f"livy_id:{livy_id}\tappId:{appId}\tsubmittedAt:{submittedAt}\tscheduledAt={scheduledAt}\tendedAt:{endedAt}\n"
logging.info(app_result)
break;
except Exception as e:
logging.error(f"Exception while monitoring Job: {e}")
logging.info(f"{wl} - End time: {datetime.utcnow()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment