Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Created February 26, 2020 01:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afsalthaj/194ffcd6721c03babf268b3b8f1fb1ff to your computer and use it in GitHub Desktop.
Save afsalthaj/194ffcd6721c03babf268b3b8f1fb1ff to your computer and use it in GitHub Desktop.
import json
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class ZeroClustersAvailable(Exception): pass
class MoreThanOneClusterAvailable(Exception): pass
def lambda_handler(event, context):
client = boto3.client('emr')
logger.info("Trying to list the clusters that are in either waiting or running state")
clusters = client.list_clusters(
ClusterStates = ['WAITING', 'RUNNING']
).get('Clusters')
basePlusClusters = list(filter(lambda x: x.get('Name') == ${cluster_name}, list(clusters)))
if len(basePlusClusters) == 0:
raise ZeroClustersAvailable(f'Cannot run the engine (spark job). There are no clusters available with the name baseplus-{env} in waiting state or running state')
emrCluster, *tail = basePlusClusters
if len(tail) > 0:
raise MoreThanOneClusterAvailable('More than one cluster is available with the name baseplus-dev and in states:[WAITING, RUNNING]')
extraArgs = event.get('extra_args')
className = event.get('class_name')
configFile = event.get('config_file')
jobName = event.get('job_name')
jarName = event.get('jar_name')
logger.info(f'Running spark job with extra arguments: {extraArgs}')
logger.info(f'Running spark job with class name: {className}')
logger.info(f'Running spark job with config file in: {configFile}')
logger.info('Running spark job: {jobName}')
job_map = {
'Name': '{jobName}',
'ActionOnFailure' : 'CONTINUE',
'HadoopJarStep' : {
'Properties' : [],
'Jar': 'command-runner.jar',
'Args': [
'/bin/sh',
'-c',
'spark-submit --packages org.postgresql:postgresql:42.2.5 --conf spark.driver.extraJavaOptions=\"-Ddance-card.config.bucket-name=s3://${account_id}-baseplus-artifacts/{configFile}" --class {className} --deploy-mode cluster s3://${account_id}-baseplus-artifacts/artifacts/{jarName} {extraArgs}'
]
}
}
response = client.add_job_flow_steps(
JobFlowId = emrCluster.get('Id'),
Steps = [job_map for i in range(event.get('count'))]
)
logger.info(response)
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment