Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active October 27, 2022 23:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onefoursix/4da96a9021c9bba674dd946ad6bfd406 to your computer and use it in GitHub Desktop.
Save onefoursix/4da96a9021c9bba674dd946ad6bfd406 to your computer and use it in GitHub Desktop.
Python Script to run a StreamSets DataOps Platform Job on a dynamically deployed Engine on AWS
#!/usr/bin/env python
'''This Python script creates and starts a DataOps Platform AWS Deployment of a single
SDC Engine, starts a Job that runs on that Engine, waits for the Job to complete, then
stops and deletes the SDC Engine and Deployment.
Prerequisites:
- An AWS Environment configured in DataOps Platform in an Active state
- A Python 3.4 or higher environment to run this script
- The StreamSets SDK for Python module installed in your Python environment, see
https://docs.streamsets.com/platform-sdk/latest/learn/installation.html
- A StreamSets batch Job to be run on the Engine
'''
# Imports
import datetime, sys
from time import sleep
from streamsets.sdk import ControlHub
## USER VARIABLES ##############################################
# CRED_ID -- Your API Credential CRED_ID.
CRED_ID = ''
# CRED_TOKEN -- Your API Credential CRED_TOKEN
CRED_TOKEN = ''
# The ID of the Job to run
JOB_ID = ''
# The LABEL to match the Job to the ephemeral SDC Instance
LABEL = ''
# Your AWS Environment name
AWS_ENVIRONMENT_NAME = ''
# The AWS SSH Key to set for the EC2 instances
AWS_KEY_NAME = ''
# AWS Deployment tags
AWS_DEPLOYMENT_TAGS = {'key':'value'}
# The AWS Instance Profile ARN to use for the EC2 instances
EC2_INSTANCE_PROFILE = ''
# EC2 Instance Type
EC2_INSTANCE_TYPE = ''
# The number of EC2 Instances to spin up
NUM_INSTANCES = 1
# Custom Stage Libs list
SDC_STAGE_LIBS = ['jython_2_7', 'jdbc']
# How long to wait for the Engine Deployment to complete
MAX_WAIT_SECONDS_FOR_ENGINE_DEPLOYMENT = 60 * 5 # example for 5 minutes
# Frequency to poll Control Hub for status
POLLING_FREQUENCY_SECONDS = 30
# How long to wait for the Job to become Active
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE = 20
# How long to wait for the Job to complete
MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE = 60 * 10 # example for 10 minutes
##############################################
def print_message(message):
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message)
# Init connection to Control Hub
print_message('Connecting to Control Hub')
sch = ControlHub(credential_id=CRED_ID, token=CRED_TOKEN)
# Get the Environment
environment = None
for env in sch.environments:
if(env.environment_type == 'AWS' and env.environment_name == AWS_ENVIRONMENT_NAME):
print_message('Found AWS environment named \'' + AWS_ENVIRONMENT_NAME + '\'')
environment = env
break
if environment is None:
print_message('ERROR: could not find AWS environment named \'' + AWS_ENVIRONMENT_NAME + '\'')
exit(-1)
print_message('Creating AWS Deployment named \'' + LABEL + '\'')
# Get an AWS Deployment Builder
deployment_builder = sch.get_deployment_builder(deployment_type='EC2')
# Define an AWS Deployment
deployment = deployment_builder.build(
deployment_name=LABEL,
environment=environment,
engine_type='DC',
engine_version='5.2.0',
deployment_tags=[LABEL]
)
# Configure the AWS Deployment
deployment.engine_instances = NUM_INSTANCES
deployment.ec2_instance_type = EC2_INSTANCE_TYPE
deployment.ssh_key_source = 'EXISTING_KEY_PAIR_NAME'
deployment.key_pair_name = AWS_KEY_NAME
deployment.aws_tags = AWS_DEPLOYMENT_TAGS
deployment.instance_profile = EC2_INSTANCE_PROFILE
# Add the Deployment to Control Hub
sch.add_deployment(deployment)
# These base Stage Libs should always be included
deployment.engine_configuration.stage_libs = ['dataformats', 'dev', 'basic', 'stats']
# Add the custom list of Stage Libs
print_message('Adding Stage Libraries to the Deployment: ' + str(SDC_STAGE_LIBS))
deployment.engine_configuration.stage_libs.extend(SDC_STAGE_LIBS)
# Update the Deployment
sch.update_deployment(deployment)
# Start the Deployment
print_message('Starting the AWS Deployment. This may take a couple of minutes...')
sch.start_deployment(deployment)
## Wait for the new SDC instance to register with Control Hub
wait_seconds = 0
sdc_is_registered = False
while not sdc_is_registered:
print_message('Waiting for Engine to register with Control Hub...')
for sdc in sch.data_collectors:
for reported_label in sdc.reported_labels:
if reported_label == LABEL:
sdc_is_registered = True
print_message('Engine has registered with Control Hub')
break
if sdc_is_registered:
break
if sdc_is_registered:
break
if wait_seconds > MAX_WAIT_SECONDS_FOR_ENGINE_DEPLOYMENT:
# One could delete the Deployment here, but I chose not to delete it by default
# because that allows the user to examine the failed Deployment in the Control Hub UI
# to see if it is misconfigured, Uncomment the next line for automatic clean up
# sch.delete_deployment(deployment)
sys.exit('Error: Timeout waiting for Engine to register with Control Hub. ')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
## Get Job
print_message('Getting Job with JOB_ID: \'' + JOB_ID + '\'')
job = sch.jobs.get(job_id = JOB_ID)
## Set the Job's Runtime Parameters
print_message('Setting the Job\'s runtime parameters')
job.runtime_parameters = {"PARAM_1":"aaa","PARAM_2":"bbb"}
## Set the Job's Label
print_message('Setting the Job\'s label to \'' + LABEL + '\'')
job.data_collector_labels = [LABEL]
## Update the Job
sch.update_job(job)
## Start the Job
sch.start_job(job)
print_message('Starting Job...')
## Wait for Job to transition to Active
job.refresh()
wait_seconds = 0
while job.status.status != 'ACTIVE':
job.refresh()
print_message('Waiting for Job to become ACTIVE')
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE:
sys.exit('Error: Timeout waiting for Job to become ACTIVE')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
print_message('Job status is ACTIVE')
## Wait for Job to complete or to timeout
print_message('Waiting for Job to complete...')
job.refresh()
wait_seconds = 0
while job.status.status != 'INACTIVE':
job.refresh()
print_message('Waiting for Job to complete...')
sleep(POLLING_FREQUENCY_SECONDS)
wait_seconds += POLLING_FREQUENCY_SECONDS
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE:
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' Error: Timeout waiting for Job to complete')
break
if job.status.status == 'INACTIVE':
print_message('Job completed successfully')
else:
print_message('Error: Job did not complete successfully')
print_message('Job status is ' + job.status.status)
## Stop the AWS Deployment
print_message('Stopping AWS deployment')
sch.stop_deployment(deployment)
## Delete the AWS Deployment
print_message('Deleting AWS deployment')
sch.delete_deployment(deployment)
print_message('Done')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment