Skip to content

Instantly share code, notes, and snippets.

@PedroMartinSteenstrup
Last active August 12, 2022 12:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PedroMartinSteenstrup/6f40a32e6d1aa8851011b58a528c8119 to your computer and use it in GitHub Desktop.
Save PedroMartinSteenstrup/6f40a32e6d1aa8851011b58a528c8119 to your computer and use it in GitHub Desktop.
emr stuff
import boto3
from analytics_utils.config import config_2
class EMR:
def __init__(self, **kwargs):
self.config_object = getattr(config_2, 'EMRUtils')(**kwargs)
self.config = self.config_object.config
self.cluster_config = None
self._set_client()
def _set_client(self):
self.client = boto3.client('emr', region_name=self.config['aws_region'])
import boto3
import json
import inspect
import time
import requests
JSON_HEADER = {'Content-Type': 'application/json'}
class Cluster:
def __init__(self, instance_count=3, instance_type='m4.large',
instance_type_master=None, instance_type_worker=None):
self.client = boto3.client('emr', region_name='eu-central-1')
print('Waiting for the EMR cluster to boot...', flush=True)
spin_up_response = self.client.run_job_flow(
Name="Spark On-Demand",
LogUri='s3://xxxxx-xx-xxx/emr/logs',
ReleaseLabel='emr-5.24.1',
Instances={
'InstanceGroups': [
{
'Name': "Master nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': instance_type_master or instance_type,
'InstanceCount': 1,
'Configurations': [
{
'Classification': 'spark-env',
'Configurations': [
{
'Classification': 'export',
'Properties': {
'PYSPARK_PYTHON': '/usr/bin/python3'
}
}
]
},
{
'Classification': 'spark-defaults',
'Properties':
{
'spark.delta.logStore.class': "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore"
}
}
]
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': instance_type_worker or instance_type,
'InstanceCount': instance_count - 1
}
],
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False,
'Ec2SubnetId': 'subnet-xxxxxxxxxx',
'Ec2KeyName': 'xxxxxxxxxxxxxxx'
},
Applications=[
{'Name': 'hadoop'},
{'Name': 'spark'},
{'Name': 'hive'},
{'Name': 'livy'},
{'Name': 'zeppelin'}
],
Tags=[
{
'Key': 'Name',
'Value': 'spark on-demand'
},
{
'Key': 'hostgroup',
'Value': 'analytics'
}
],
BootstrapActions=[
{
'Name': 'Prepare Python environment',
'ScriptBootstrapAction': {
'Path': 's3://htg-dw-c1/emr/bootstrap.sh'
}
}
],
Steps=[
{
'Name': 'Delta Lake - Copy Jar',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['sudo', 'aws', 's3', 'cp', 's3://xxxxxxxxxx.jar',
'/usr/lib/livy/repl_2.11-jars']
}
},
{
'Name': 'Apache Livy - Copy Setup',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['aws', 's3', 'cp', 's3://xxxxxxxxxx.sh', '/home/hadoop/']
}
},
{
'Name': 'Apache Livy - Run Setup',
'ActionOnFailure': 'CANCEL_AND_WAIT',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': ['bash', '/home/hadoop/step-livy.sh']
}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole'
)
self.id = spin_up_response['JobFlowId']
self.client.get_waiter('cluster_running').wait(ClusterId=self.id)
final_step_id = self.client.list_steps(ClusterId=self.id)['Steps'][0]['Id']
self.client.get_waiter('step_complete').wait(ClusterId=self.id, StepId=final_step_id)
print(f'...cluster ({self.id}) is now running.', flush=True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(f'Terminating the cluster ({self.id}).', flush=True)
self.terminate()
@property
def dns(self):
description_response = self.client.describe_cluster(ClusterId=self.id)
return description_response['Cluster']['MasterPublicDnsName']
def terminate(self):
self.client.terminate_job_flows(JobFlowIds=[self.id])
class Session:
def __init__(self, cluster_dns):
self.dns = cluster_dns
self.id = ''
response = requests.post(self.url, data=json.dumps({'kind': 'pyspark'}), headers=JSON_HEADER)
self.id = response.json()['id']
print('Establishing a Spark session...', flush=True)
status = None
while status != 'idle':
time.sleep(3)
status_response = requests.get(self.url, headers=JSON_HEADER)
status = status_response.json()['state']
print(f'...session ({self.id}) is ready.', flush=True)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.terminate()
@property
def url(self):
return f'http://{self.dns}:8998/sessions/{self.id}'
def terminate(self):
print(f'Terminating the session ({self.id}).', flush=True)
requests.delete(self.url, headers=JSON_HEADER)
def submit(self, code, follow=True):
endpoint = f'{self.url}/statements'
response = requests.post(endpoint, data=json.dumps({'code': code}), headers=JSON_HEADER)
endpoint = f"{endpoint}/{response.json()['id']}"
if follow:
state = None
while state != 'available':
time.sleep(3)
response_json = requests.get(endpoint, headers=JSON_HEADER).json()
state = response_json['state']
if response_json['output']['status'] == 'error':
print(f"Statement exception: {response_json['output']['evalue']}")
for trace in response_json['output']['traceback']:
print(trace)
output = response_json['output']['data'].get('text/plain')
if output is not None:
print(output, flush=True)
return response
def emr_cluster(**kwargs_cluster):
def decorator(func):
def wrapper(*args, **kwargs):
with Cluster(**kwargs_cluster) as cluster:
with Session(cluster.dns) as session:
func_code = inspect.getsource(func).split('\n')[1:]
arguments_raw = [('', arg) for arg in args] + [(f'{key}=', value) for key, value in kwargs.items()]
arguments = ', '.join(f'{k}\'{v}\'' if isinstance(v, str) else f'{k}{v}' for k, v in arguments_raw)
func_call = f'{func.__name__}({arguments})'
session.submit('\n'.join(func_code + [func_call]))
return wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment