Python Script to run a StreamSets DataOps Platform Job on a dynamically deployed Engine on AWS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
'''This Python script creates and starts a DataOps Platform AWS Deployment of a single | |
SDC Engine, starts a Job that runs on that Engine, waits for the Job to complete, then | |
stops and deletes the SDC Engine and Deployment. | |
Prerequisites: | |
- An AWS Environment configured in DataOps Platform in an Active state | |
- A Python 3.4 or higher environment to run this script | |
- The StreamSets SDK for Python module installed in your Python environment, see | |
https://docs.streamsets.com/platform-sdk/latest/learn/installation.html | |
- A StreamSets batch Job to be run on the Engine | |
''' | |
# Imports | |
import datetime, sys | |
from time import sleep | |
from streamsets.sdk import ControlHub | |
## USER VARIABLES ############################################## | |
# CRED_ID -- Your API Credential CRED_ID. | |
CRED_ID = '' | |
# CRED_TOKEN -- Your API Credential CRED_TOKEN | |
CRED_TOKEN = '' | |
# The ID of the Job to run | |
JOB_ID = '' | |
# The LABEL to match the Job to the ephemeral SDC Instance | |
LABEL = '' | |
# Your AWS Environment name | |
AWS_ENVIRONMENT_NAME = '' | |
# The AWS SSH Key to set for the EC2 instances | |
AWS_KEY_NAME = '' | |
# AWS Deployment tags | |
AWS_DEPLOYMENT_TAGS = {'key':'value'} | |
# The AWS Instance Profile ARN to use for the EC2 instances | |
EC2_INSTANCE_PROFILE = '' | |
# EC2 Instance Type | |
EC2_INSTANCE_TYPE = '' | |
# The number of EC2 Instances to spin up | |
NUM_INSTANCES = 1 | |
# Custom Stage Libs list | |
SDC_STAGE_LIBS = ['jython_2_7', 'jdbc'] | |
# How long to wait for the Engine Deployment to complete | |
MAX_WAIT_SECONDS_FOR_ENGINE_DEPLOYMENT = 60 * 5 # example for 5 minutes | |
# Frequency to poll Control Hub for status | |
POLLING_FREQUENCY_SECONDS = 30 | |
# How long to wait for the Job to become Active | |
MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE = 20 | |
# How long to wait for the Job to complete | |
MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE = 60 * 10 # example for 10 minutes | |
############################################## | |
def print_message(message): | |
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' ' + message) | |
# Init connection to Control Hub | |
print_message('Connecting to Control Hub') | |
sch = ControlHub(credential_id=CRED_ID, token=CRED_TOKEN) | |
# Get the Environment | |
environment = None | |
for env in sch.environments: | |
if(env.environment_type == 'AWS' and env.environment_name == AWS_ENVIRONMENT_NAME): | |
print_message('Found AWS environment named \'' + AWS_ENVIRONMENT_NAME + '\'') | |
environment = env | |
break | |
if environment is None: | |
print_message('ERROR: could not find AWS environment named \'' + AWS_ENVIRONMENT_NAME + '\'') | |
exit(-1) | |
print_message('Creating AWS Deployment named \'' + LABEL + '\'') | |
# Get an AWS Deployment Builder | |
deployment_builder = sch.get_deployment_builder(deployment_type='EC2') | |
# Define an AWS Deployment | |
deployment = deployment_builder.build( | |
deployment_name=LABEL, | |
environment=environment, | |
engine_type='DC', | |
engine_version='5.2.0', | |
deployment_tags=[LABEL] | |
) | |
# Configure the AWS Deployment | |
deployment.engine_instances = NUM_INSTANCES | |
deployment.ec2_instance_type = EC2_INSTANCE_TYPE | |
deployment.ssh_key_source = 'EXISTING_KEY_PAIR_NAME' | |
deployment.key_pair_name = AWS_KEY_NAME | |
deployment.aws_tags = AWS_DEPLOYMENT_TAGS | |
deployment.instance_profile = EC2_INSTANCE_PROFILE | |
# Add the Deployment to Control Hub | |
sch.add_deployment(deployment) | |
# These base Stage Libs should always be included | |
deployment.engine_configuration.stage_libs = ['dataformats', 'dev', 'basic', 'stats'] | |
# Add the custom list of Stage Libs | |
print_message('Adding Stage Libraries to the Deployment: ' + str(SDC_STAGE_LIBS)) | |
deployment.engine_configuration.stage_libs.extend(SDC_STAGE_LIBS) | |
# Update the Deployment | |
sch.update_deployment(deployment) | |
# Start the Deployment | |
print_message('Starting the AWS Deployment. This may take a couple of minutes...') | |
sch.start_deployment(deployment) | |
## Wait for the new SDC instance to register with Control Hub | |
wait_seconds = 0 | |
sdc_is_registered = False | |
while not sdc_is_registered: | |
print_message('Waiting for Engine to register with Control Hub...') | |
for sdc in sch.data_collectors: | |
for reported_label in sdc.reported_labels: | |
if reported_label == LABEL: | |
sdc_is_registered = True | |
print_message('Engine has registered with Control Hub') | |
break | |
if sdc_is_registered: | |
break | |
if sdc_is_registered: | |
break | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_ENGINE_DEPLOYMENT: | |
# One could delete the Deployment here, but I chose not to delete it by default | |
# because that allows the user to examine the failed Deployment in the Control Hub UI | |
# to see if it is misconfigured, Uncomment the next line for automatic clean up | |
# sch.delete_deployment(deployment) | |
sys.exit('Error: Timeout waiting for Engine to register with Control Hub. ') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
## Get Job | |
print_message('Getting Job with JOB_ID: \'' + JOB_ID + '\'') | |
job = sch.jobs.get(job_id = JOB_ID) | |
## Set the Job's Runtime Parameters | |
print_message('Setting the Job\'s runtime parameters') | |
job.runtime_parameters = {"PARAM_1":"aaa","PARAM_2":"bbb"} | |
## Set the Job's Label | |
print_message('Setting the Job\'s label to \'' + LABEL + '\'') | |
job.data_collector_labels = [LABEL] | |
## Update the Job | |
sch.update_job(job) | |
## Start the Job | |
sch.start_job(job) | |
print_message('Starting Job...') | |
## Wait for Job to transition to Active | |
job.refresh() | |
wait_seconds = 0 | |
while job.status.status != 'ACTIVE': | |
job.refresh() | |
print_message('Waiting for Job to become ACTIVE') | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_BECOME_ACTIVE: | |
sys.exit('Error: Timeout waiting for Job to become ACTIVE') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
print_message('Job status is ACTIVE') | |
## Wait for Job to complete or to timeout | |
print_message('Waiting for Job to complete...') | |
job.refresh() | |
wait_seconds = 0 | |
while job.status.status != 'INACTIVE': | |
job.refresh() | |
print_message('Waiting for Job to complete...') | |
sleep(POLLING_FREQUENCY_SECONDS) | |
wait_seconds += POLLING_FREQUENCY_SECONDS | |
if wait_seconds > MAX_WAIT_SECONDS_FOR_JOB_TO_COMPLETE: | |
print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' Error: Timeout waiting for Job to complete') | |
break | |
if job.status.status == 'INACTIVE': | |
print_message('Job completed successfully') | |
else: | |
print_message('Error: Job did not complete successfully') | |
print_message('Job status is ' + job.status.status) | |
## Stop the AWS Deployment | |
print_message('Stopping AWS deployment') | |
sch.stop_deployment(deployment) | |
## Delete the AWS Deployment | |
print_message('Deleting AWS deployment') | |
sch.delete_deployment(deployment) | |
print_message('Done') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment