Skip to content

Instantly share code, notes, and snippets.

@Mageswaran1989
Last active December 8, 2021 11:13
Show Gist options
  • Save Mageswaran1989/17b9f885ef40c06b64e274eac07e2d1e to your computer and use it in GitHub Desktop.
Save Mageswaran1989/17b9f885ef40c06b64e274eac07e2d1e to your computer and use it in GitHub Desktop.
# https://dzone.com/articles/execute-spark-applications-with-apache-livy
# https://www.statworx.com/at/blog/access-your-spark-cluster-from-everywhere-with-apache-livy/
# https://livy.incubator.apache.org/docs/latest/rest-api.html
import time
from typing import Optional
import requests
import argparse
import json
class Livy:
STATES = ["success", "dead", "unknown"]
SERVICE_NOT_AVAILABLE = "service is not available"
TIMEOUT = 1 * 60 * 60 # 1 hour
SLEEP_TIME = 5 # pool every 5 seconds
def __init__(self, livy_url):
self.livy_url = livy_url
self.batch_id = None
def submit_batch(self, params):
print("Submitting the job...")
run_state = None
try:
if self.livy_url is not None and self.livy_url != "":
url = f"{self.livy_url}/batches"
ret = requests.post(url, json=params)
if ret.status_code == 201:
res_json = ret.json()
self.batch_id = res_json.get("id", None)
run_state = res_json.get("state", None)
else:
self.batch_id = -1
run_state = Livy.SERVICE_NOT_AVAILABLE
else:
self.batch_id = -1
run_state = Livy.SERVICE_NOT_AVAILABLE
except Exception as e:
print(e)
self.batch_id = -1
run_state = Livy.SERVICE_NOT_AVAILABLE
return self.batch_id, run_state
def get_logs(self):
url = f"{self.livy_url}/batches/{self.batch_id}/log"
log_state = None
ret = requests.get(url)
if ret is not None:
log_state = ret.json()['log']
return log_state
def get_state(self):
url = f"{self.livy_url}/batches/{self.batch_id}/state"
ret = requests.get(url)
if ret.status_code == 200:
res_json = ret.json()
run_state = res_json.get("state", None)
else:
run_state = "unknown"
return run_state
def wait(self, sleep_time, timeout_value):
run_state = "running"
run_time = 0
while run_state not in Livy.STATES and run_time < timeout_value:
print("\n\n Poll...", self.batch_id)
logs = self.get_logs()
time.sleep(sleep_time)
run_time = run_time + sleep_time
run_state = self.get_state()
return run_time, run_state
def main(params):
server_url = params.url
now = time.time()
DOCKER_IMAGE_NAME = "private_repo/spark:123456"
body_data = {
"name": params.name + str(now),
"file": params.file,
"driverMemory": "6G",
"driverCores": 4,
"executorMemory": "32G",
"executorCores": 5,
"numExecutors": 2,
"conf": {
# add required configs here
"spark.submit.deployMode": "cluster",
"spark.scheduler.mode": "FAIR",
"spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE": "docker",
"spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE": DOCKER_IMAGE_NAME,
"spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE": "docker",
"spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE": DOCKER_IMAGE_NAME,
"spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG": "hdfs:///path/to/hadoop/config.json",
"spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG": "hdfs:///path/to/hadoop/config.json",
"spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON": "python3",
"spark.yarn.appMasterEnv.PYSPARK_PYTHON": "python3"
},
}
if params.pyFiles:
body_data["pyFiles"] = [params.pyFiles]
if params.args:
body_data["args"] = params.args.split(" ")
livy = Livy(livy_url=server_url)
job_id, state = livy.submit_batch(body_data)
if state == Livy.SERVICE_NOT_AVAILABLE:
print(state)
print("Not Available")
else:
time_taken, state = livy.wait(Livy.SLEEP_TIME,
Livy.TIMEOUT)
if time_taken > Livy.TIMEOUT:
print("Timeout...")
else:
print(f"Took {time_taken} seconds and with {state} status")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='PySpark Livy Job Submitter')
parser.add_argument("--name", type=str, default="LivySpark")
parser.add_argument("--url", type=str)
parser.add_argument("--pyFiles", type=str)
parser.add_argument("--file", type=str)
parser.add_argument("--args", type=str, required=False)
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment