Skip to content

Instantly share code, notes, and snippets.

@osvadimos
Created August 3, 2016 07:20
Show Gist options
  • Save osvadimos/2954ce4c0f7fc249594c999822e639f2 to your computer and use it in GitHub Desktop.
Save osvadimos/2954ce4c0f7fc249594c999822e639f2 to your computer and use it in GitHub Desktop.
Pipeline object creator
import com.amazonaws.services.datapipeline.DataPipelineClient;
import com.amazonaws.services.datapipeline.model.*;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
public class DDBExportPipelineCreator {
public static PutPipelineDefinitionResult putPipelineDefinition(
final DataPipelineClient dataPipelineClient,
final String pipelineId,
final Map<String,
String> params) {
List<PipelineObject> pipelineObjectList = getPipelineObjects();
List<ParameterValue> parameterValues = getParameterValues(params);
PutPipelineDefinitionRequest putPipelineDefinition = new PutPipelineDefinitionRequest()
.withPipelineId(pipelineId)
.withParameterValues(parameterValues)
.withPipelineObjects(pipelineObjectList);
PutPipelineDefinitionResult putPipelineResult = dataPipelineClient.putPipelineDefinition(putPipelineDefinition);
if (putPipelineResult.isErrored()) {
throw new RuntimeException("Error in pipeline definition.");
}
return putPipelineResult;
}
private static List<PipelineObject> getPipelineObjects() {
PipelineObject defaultObject = DDBExportPipelineObjectCreator.getDefault();
PipelineObject ddbSourceTable = DDBExportPipelineObjectCreator.getDDBSourceTable();
PipelineObject s3BackupLocation = DDBExportPipelineObjectCreator.getS3BackupLocation();
PipelineObject emrCluster = DDBExportPipelineObjectCreator.getEMRCluster();
PipelineObject emrActivity = DDBExportPipelineObjectCreator.getEMRActivity();
PipelineObject snsSuccess = DDBExportPipelineObjectCreator.getSNSSuccessActivity();
PipelineObject snsFail = DDBExportPipelineObjectCreator.getSNSFailActivity();
return Lists.newArrayList(
defaultObject,
ddbSourceTable,
s3BackupLocation,
emrCluster,
emrActivity,
snsSuccess,
snsFail);
}
private static List<ParameterValue> getParameterValues(final Map<String, String> params) {
ParameterValue ddbRegion = new ParameterValue()
.withId("myDDBRegion")
.withStringValue(params.get("region"));
ParameterValue pipeDDBTableName = new ParameterValue()
.withId("myDDBTableName")
.withStringValue(params.get("myDDBTableName"));
ParameterValue pipeDDBReadThroughputRatio = new ParameterValue()
.withId("myDDBReadThroughputRatio")
.withStringValue("0.25");
ParameterValue pipeOutputS3Location = new ParameterValue()
.withId("myOutputS3Location")
.withStringValue(params.get("myOutputS3Location"));
ParameterValue pipeLogsS3Location = new ParameterValue()
.withId("myLogsS3Location")
.withStringValue(params.get("myLogsS3Location"));
ParameterValue myResizeClusterBeforeRunning = new ParameterValue()
.withId("myResizeClusterBeforeRunning")
.withStringValue("true");
ParameterValue mySuccessARN = new ParameterValue()
.withId("myTopicSuccess")
.withStringValue(params.get("myTopicSuccess"));
ParameterValue myFailARN = new ParameterValue()
.withId("myTopicFail")
.withStringValue(params.get("myTopicFail"));
return Lists.newArrayList(
ddbRegion,
pipeDDBTableName,
pipeDDBReadThroughputRatio,
pipeOutputS3Location,
myResizeClusterBeforeRunning,
pipeLogsS3Location,
mySuccessARN,
myFailARN);
}
public static ActivatePipelineResult activatePipeline(final DataPipelineClient dataPipelineClient, final String pipelineId) {
ActivatePipelineRequest activatePipelineRequest = new ActivatePipelineRequest()
.withPipelineId(pipelineId);
ActivatePipelineResult result = dataPipelineClient.activatePipeline(activatePipelineRequest);
return result;
}
}
import com.amazonaws.services.datapipeline.model.Field;
import com.amazonaws.services.datapipeline.model.PipelineObject;
import com.google.common.collect.Lists;
import java.util.List;
public class DDBExportPipelineObjectCreator {
public static PipelineObject getDefault() {
String name = "Default";
String id = "Default";
Field type = new Field()
.withKey("scheduleType")
.withStringValue("ondemand");
Field role = new Field()
.withKey("role")
.withStringValue("DataPipelineDefaultRole");
Field resourceRole = new Field()
.withKey("resourceRole")
.withStringValue("DataPipelineDefaultResourceRole");
Field pipelineLogURI = new Field()
.withKey("pipelineLogUri")
.withStringValue("#{myLogsS3Location}");
List<Field> fieldsList = Lists.newArrayList(
type,
role,
resourceRole,
pipelineLogURI);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
public static PipelineObject getSNSSuccessActivity() {
String name = "SuccessNotify";
String id = "SuccessNotify";
Field type = new Field()
.withKey("type")
.withStringValue("SnsAlarm");
Field topicArn = new Field()
.withKey("topicArn")
.withStringValue("#{myTopicSuccess}");
Field role = new Field()
.withKey("role")
.withStringValue("DataPipelineDefaultRole");
Field subject = new Field()
.withKey("subject")
.withStringValue("COPY SUCCESS: #{node.@scheduledStartTime}");
Field message = new Field()
.withKey("message")
.withStringValue("#{myDDBTableName}");
List<Field> fieldsList = Lists.newArrayList(type,
topicArn,
role,
subject,
message);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
public static PipelineObject getSNSFailActivity() {
String name = "FailureNotify";
String id = "FailureNotify";
Field type = new Field()
.withKey("type")
.withStringValue("SnsAlarm");
Field topicArn = new Field()
.withKey("topicArn")
.withStringValue("#{myTopicFail}");
Field role = new Field()
.withKey("role")
.withStringValue("DataPipelineDefaultRole");
Field subject = new Field()
.withKey("subject")
.withStringValue("FAIL: #{node.@scheduledStartTime}");
Field message = new Field()
.withKey("message")
.withStringValue("#{myDDBTableName}");
List<Field> fieldsList = Lists.newArrayList(type,
role,
topicArn,
subject,
message);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
public static PipelineObject getDDBSourceTable() {
String name = "DDBSourceTable";
String id = "DDBSourceTable";
Field type = new Field()
.withKey("type")
.withStringValue("DynamoDBDataNode");
Field tableName = new Field()
.withKey("tableName")
.withStringValue("#{myDDBTableName}");
Field readThroughputPercent = new Field().withKey("readThroughputPercent")
.withStringValue("#{myDDBReadThroughputRatio}");
List<Field> fieldsList = Lists.newArrayList(tableName, type, readThroughputPercent);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
public static PipelineObject getS3BackupLocation() {
String name = "S3BackupLocation";
String id = "S3BackupLocation";
Field type = new Field()
.withKey("type")
.withStringValue("S3DataNode");
Field directoryPath = new Field()
.withKey("directoryPath")
.withStringValue("#{myOutputS3Location}#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}");
Field onFail = new Field()
.withKey("onFail")
.withRefValue("FailureNotify");
Field onSuccess = new Field()
.withKey("onSuccess")
.withRefValue("SuccessNotify");
List<Field> fieldsList = Lists.newArrayList(type,
directoryPath,
onFail,
onSuccess);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
public static PipelineObject getEMRCluster() {
String name = "EmrClusterForBackup";
String id = "EmrClusterForBackup";
Field type = new Field()
.withKey("type")
.withStringValue("EmrCluster");
Field amiVersion = new Field()
.withKey("amiVersion")
.withStringValue("3.10.0");
Field masterInstanceType = new Field()
.withKey("masterInstanceType")
.withStringValue("m3.xlarge");
Field coreInstanceType = new Field()
.withKey("coreInstanceType")
.withStringValue("m3.xlarge");
Field coreInstanceCount = new Field()
.withKey("coreInstanceCount")
.withStringValue("1");
Field region = new Field()
.withKey("region")
.withStringValue("#{myDDBRegion}");
Field terminateAfter = new Field()
.withKey("terminateAfter")
.withStringValue("12 hours");
Field bootstrapAction = new Field()
.withKey("bootstrapAction")
.withStringValue("s3://elasticmapreduce" +
"/bootstrap-actions/configure-hadoop, --yarn-key-value,yarn.nodemanager.resource.memory-mb=11520," +
"--yarn-key-value,yarn.scheduler.maximum-allocation-mb=11520," +
"--yarn-key-value,yarn.scheduler.minimum-allocation-mb=1440," +
"--yarn-key-value,yarn.app.mapreduce.am.resource.mb=2880," +
"--mapred-key-value,mapreduce.map.memory.mb=5760," +
"--mapred-key-value,mapreduce.map.java.opts=-Xmx4608M," +
"--mapred-key-value,mapreduce.reduce.memory.mb=2880," +
"--mapred-key-value,mapreduce.reduce.java.opts=-Xmx2304m," +
"--mapred-key-value,mapreduce.map.speculative=false");
List<Field> fieldsList = Lists.newArrayList(type,
amiVersion,
masterInstanceType,
coreInstanceCount,
coreInstanceType,
region,
terminateAfter,
bootstrapAction);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
public static PipelineObject getEMRActivity() {
String name = "TableBackupActivity";
String id = "TableBackupActivity";
Field type = new Field()
.withKey("type")
.withStringValue("EmrActivity");
Field input = new Field()
.withKey("input")
.withRefValue("DDBSourceTable");
Field output = new Field()
.withKey("output")
.withRefValue("S3BackupLocation");
Field runsOn = new Field()
.withKey("runsOn")
.withRefValue("EmrClusterForBackup");
Field resizeClusterBeforeRunning = new Field()
.withKey("resizeClusterBeforeRunning")
.withStringValue("#{myResizeClusterBeforeRunning}");
Field maximumRetries = new Field()
.withKey("maximumRetries")
.withStringValue("2");
Field step = new Field().withKey("step")
.withStringValue("s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-" +
"handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport," +
"#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}");
List<Field> fieldsList = Lists.newArrayList(type,
input,
output,
runsOn,
resizeClusterBeforeRunning,
maximumRetries,
step);
return new PipelineObject()
.withName(name)
.withId(id)
.withFields(fieldsList);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment