Skip to content

Instantly share code, notes, and snippets.

@mrchristine
Last active November 6, 2018 16:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrchristine/11fbaf92069aae3e416ceced357a1dc6 to your computer and use it in GitHub Desktop.
Save mrchristine/11fbaf92069aae3e416ceced357a1dc6 to your computer and use it in GitHub Desktop.
Job to update legacy instance types on Databricks
import json, pprint, requests, datetime
################################################################
## Replace the token variable and environment url below
################################################################
# Helper to pretty print json
def pprint_j(i):
print json.dumps(i, indent=4, sort_keys=True)
class DatabricksRestClient:
"""A class to define wrappers for the REST API"""
def __init__(self, token="ABCDEFG1234", url="https://myenv.cloud.databricks.com"):
self.__token = {'Authorization': 'Bearer {0}'.format(token)}
self.__url = url
def get(self, endpoint, json_params = {}, printJson = False):
if json_params:
results = requests.get(self.__url + '/api/2.0' + endpoint, headers=self.__token, params=json_params).json()
else:
results = requests.get(self.__url + '/api/2.0' + endpoint, headers=self.__token).json()
if printJson:
print json.dumps(results, indent=4, sort_keys=True)
return results
def post(self, endpoint, json_params = {}, printJson = True):
if json_params:
raw_results = requests.post(self.__url + '/api/2.0' + endpoint, headers=self.__token, json=json_params)
results = raw_results.json()
else:
print "Must have a payload in json_args param."
return {}
if printJson:
print json.dumps(results, indent=4, sort_keys=True)
# if results are empty, let's return the return status
if results:
results['http_status_code'] = raw_results.status_code
return results
else:
return {'http_status_code': raw_results.status_code}
def list_node_types(self):
return self.get("/clusters/list-node-types")
def get_jobs_list(self, printJson = False):
""" Returns an array of json objects for jobs """
jobs = self.get("/jobs/list", printJson = printJson)
return jobs['jobs']
def get_job_id(self, name):
jobs = self.get_jobs_list()
for i in jobs:
if i['settings']['name'] == name:
return i['job_id']
return None
def create_or_update_job_config(self, job_name, new_config):
job_id = self.get_job_id(job_name)
if job_id is None:
print "Job does not exist. Create new job: "
resp = self.post('/jobs/create', json_params = new_config)
# create the job config here
else:
resp = self.get('/jobs/get?job_id={0}'.format(job_id))
print "Print the current configs: "
cur_config = resp.pop('settings')
cur_name = cur_config['name']
pprint_j(cur_config)
# remove the name setting from above, and create a new_settings config to point to existing configs
new_config['name'] = cur_name
update_config = resp
update_config['new_settings'] = new_config
print "Applying new configs: "
pprint_j(update_config)
resp = self.post('/jobs/reset', json_params = update_config)
print resp
def print_job_details(self, job_id):
resp = self.get('/jobs/get?job_id={0}'.format(job_id))
print "Job details: "
pprint_j(resp)
def get_spark_versions(self):
return self.get("/clusters/spark-versions")
######################
# Global functions
#######################
def update_old_node_types(job_conf, node_type=1):
# pop old settings
job_settings = job_conf.pop('settings')
# grab cluster settings
cluster_conf = job_settings['new_cluster']
# set the new instance type
if node_type == 1:
cluster_conf['node_type_id'] = "r3.xlarge"
cluster_conf['driver_node_type_id'] = 'r3.xlarge'
else:
cluster_conf['node_type_id'] = "c3.2xlarge"
cluster_conf['driver_node_type_id'] = 'c3.2xlarge'
aws_attr = cluster_conf.pop('aws_attributes', None)
# add at least 1 EBS volume
aws_attr['ebs_volume_count'] = 1
aws_attr['ebs_volume_size'] = 100
aws_attr['ebs_volume_type'] = 'GENERAL_PURPOSE_SSD'
cluster_conf['aws_attributes'] = aws_attr
# reset the cluster conf
job_settings['new_cluster'] = cluster_conf
# set the new configs
job_conf['new_settings'] = job_settings
# remove created_timestamp
job_conf.pop("created_time", None)
return job_conf
def get_legacy_job_ids(jl, node_type=1):
# node_type = 1 is memory-optimized
# node_type = 2 is compute-optimized
if node_type == 1:
legacy = "memory-optimized"
else:
legacy = "compute-optimized"
new_clusters = filter(lambda x: x['settings'].get('new_cluster', None) is not None, jl)
job_ids = []
legacy_jobs = filter(lambda x: x['settings']['new_cluster']['node_type_id'] == legacy, new_clusters)
for job in legacy_jobs:
job_ids.append(job['job_id'])
return job_ids
#####################################
# Fix legacy instance types
# Update the token and url below
####################################
# Get rest api credentials
token = "YOUR_TOKEN_HERE"
# Simple class to list versions and get active cluster list
client = DatabricksRestClient(token, 'https://YOUR_ENV.cloud.databricks.com')
# Grab job list and print to screen
job_list = client.get_jobs_list(False)
legacy_memory_job_ids = get_legacy_job_ids(job_list, 1)
legacy_compute_job_ids = get_legacy_job_ids(job_list, 2)
with open('old_cluster_configs.json', 'w') as outfile:
for jid in legacy_memory_job_ids:
outfile.write("\nOld job memory config: \n")
job_conf = client.get('/jobs/get?job_id={0}'.format(jid))
json.dump(job_conf, outfile)
new_job_conf = update_old_node_types(job_conf, 1)
outfile.write("\nNew job config: \n")
json.dump(new_job_conf, outfile)
resp = client.post('/jobs/reset', new_job_conf)
pprint_j(resp)
for jid in legacy_compute_job_ids:
outfile.write("\nOld job compute config: \n")
job_conf = client.get('/jobs/get?job_id={0}'.format(jid))
json.dump(job_conf, outfile)
new_job_conf = update_old_node_types(job_conf, 2)
outfile.write("\nNew job config: \n")
json.dump(new_job_conf, outfile)
resp = client.post('/jobs/reset', new_job_conf)
pprint_j(resp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment