Skip to content

Instantly share code, notes, and snippets.

@enryold
Created February 21, 2017 15:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enryold/77cc3baccac4370f7d69bcf52c194a9d to your computer and use it in GitHub Desktop.
Save enryold/77cc3baccac4370f7d69bcf52c194a9d to your computer and use it in GitHub Desktop.
DynamoDBtoRedshiftHiveCSV - from: https://github.com/awslabs/data-pipeline-samples
{
"objects": [
{
"myComment": "This object is used to set default configuration for objects in the pipeline.",
"id": "Default",
"name": "Default",
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"scheduleType": "cron",
"pipelineLogUri": "#{myLogUri}"
},
{
"myComment" : "The DynamoDB table from which we need to export data from",
"id": "DynamoDBInputDataNode",
"name": "DynamoDB",
"dataFormat": {
"ref": "DDBExportFormat"
},
"type": "DynamoDBDataNode",
"tableName": "#{myDDBTableName}"
},
{
"myComment" : "The S3 path to which we export data to",
"id": "S3StagingDataNode",
"name": "S3StagingDataNode",
"directoryPath": "#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}/",
"dataFormat": {
"ref": "S3StagingDataFormat"
},
"type": "S3DataNode"
},
{
"myComment" : "Format for the S3 Path",
"id": "S3StagingDataFormat",
"name": "DefaultDataFormat1",
"column": "not_used STRING",
"type": "CSV"
},
{
"myComment" : "Format for the DynamoDB table",
"id": "DDBExportFormat",
"name": "DDBExportFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
},
{
"myComment" : "Activity used to run the hive script to export data to CSV",
"id": "TableBackupStagingActivity",
"name": "TableBackupStagingActivity",
"input": {
"ref": "DynamoDBInputDataNode"
},
"output": {
"ref": "S3StagingDataNode"
},
"hiveScript": "DROP TABLE IF EXISTS tempHiveTable;\n\nDROP TABLE IF EXISTS s3TempTable;\n\nCREATE EXTERNAL TABLE tempHiveTable (#{myS3SourceColMapping})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{myDDBTableName}\", \"dynamodb.column.mapping\" = \"#{myDDBTableColMapping}\");\n\nCREATE EXTERNAL TABLE s3TempTable (#{myS3TargetColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'\nLOCATION '#{myOutputS3Loc}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}';\n\nINSERT OVERWRITE TABLE s3TempTable SELECT #{myHiveSelectColumns} FROM tempHiveTable;",
"runsOn": { "ref" : "EmrCluster1" },
"type": "HiveActivity"
},
{
"myComment": "This object is used to specify the copy activity for moving data from S3 to Redshift.",
"id": "RedshiftLoadActivity",
"name": "RedshiftLoadActivity",
"input": {
"ref": "S3StagingDataNode"
},
"output": {
"ref": "RedshiftCluster1"
},
"dependsOn": {
"ref": "TableBackupStagingActivity"
},
"runsOn": {
"ref": "EmrCluster1"
},
"type": "RedshiftCopyActivity",
"insertMode": "TRUNCATE"
},
{
"myComment": "This object is used to control the task schedule.",
"id": "DefaultSchedule",
"name": "RunOnce",
"occurrences": "1",
"period": "1 Day",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
},
{
"myComment": "This object provides connection information for the Redshift cluster.",
"id": "RedshiftDatabase1",
"name": "RedshiftDatabase1",
"connectionString": "jdbc:postgresql://#{myRedshiftEndpoint}:5439/dev",
"type": "RedshiftDatabase",
"username": "#{myRedshiftUsername}",
"*password": "#{*myRedshiftPassword}"
},
{
"myComment": "This object provides the configuration for the EMR cluster.",
"id": "EmrCluster1",
"name": "EmrCluster1",
"enableDebugging": "true",
"coreInstanceCount": "3",
"coreInstanceType": "m3.xlarge",
"releaseLabel": "emr-4.4.0",
"masterInstanceType": "m3.xlarge",
"type": "EmrCluster",
"terminateAfter": "1 Week",
"applications": "hive"
},
{
"myComment": "This object contains information about the Redshift database.",
"id": "RedshiftCluster1",
"name": "RedshiftCluster1",
"createTableSql": "#{myRedshiftCreateTableQuery}",
"database": {
"ref": "RedshiftDatabase1"
},
"primaryKeys": ["#{myRedshiftPrimaryKeys}"],
"type": "RedshiftDataNode",
"tableName": "#{myRedshiftTable}"
}
],
"parameters": [
{
"description": "S3 directory where pipeline logs will be pushed to.",
"id": "myLogUri",
"type": "AWS::S3::ObjectKey"
},
{
"description": "The name of the source table in DynamoDB.",
"id": "myDDBTableName",
"type": "String"
},
{
"description": "S3 directory where staging data will be stored.",
"id": "myOutputS3Loc",
"type": "AWS::S3::ObjectKey"
},
{
"description": "The mapping between the projected columns in the source table in Hive and their data types.",
"id": "myS3SourceColMapping",
"type": "String"
},
{
"description": "The mapping between the columns in the table in DynamoDB and the columns referenced in the Hive query for data export.",
"id": "myDDBTableColMapping",
"type": "String"
},
{
"description": "The mapping between the projected columns in the target table in Hive and their data types.",
"id": "myS3TargetColMapping",
"type": "String"
},
{
"description": "The list of columns in Hive SELECT query.",
"id": "myHiveSelectColumns",
"type": "String"
},
{
"description": "The query for inserting data into the target table in Redshift.",
"id": "myRedshiftCreateTableQuery",
"type": "String"
},
{
"description": "The target Redshift table.",
"id": "myRedshiftTable",
"type": "String"
},
{
"description": "One or more columns that make up the primary key of the target table.",
"id": "myRedshiftPrimaryKeys",
"type": "String"
},
{
"description": "The username to use for connecting to the Redshift cluster.",
"id": "myRedshiftUsername",
"type": "String"
},
{
"description": "The password for the above user to establish connection to the Redshift cluster.",
"id": "*myRedshiftPassword",
"type": "String"
},
{
"description": "The endpoint for the Redshift cluster.",
"id": "myRedshiftEndpoint",
"type": "String"
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment