Skip to content

Instantly share code, notes, and snippets.

@pluto-atom-4
Last active January 30, 2019 14:09
Show Gist options
  • Save pluto-atom-4/b79bb991cd67f8473369e58576d829eb to your computer and use it in GitHub Desktop.
Save pluto-atom-4/b79bb991cd67f8473369e58576d829eb to your computer and use it in GitHub Desktop.
AWS Data Pipeline sample CSV on S3 to DynamoDB

AWS Data Pipeline

Original content: https://github.com/aws-samples/data-pipeline-samples/tree/master/samples/DynamoDBImportCSV

Prerequisites

IAM

  • Roles:
  • [DataPipelineDefaultRole, AWSDataPipelineRole]
  • [DataPipelineDefaultResourceRole, AmazonEC2RoleforDataPipelineRole]
  • Group:
  • [DataPipelineDevelopers, AWSDataPipeline_FullAccess]

Ref. https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-get-setup.html

  • Custom IAM roles

VPC

Not create the VPC with the wizard but create a default vpc as I deleted the default vpc on us-west-2 while doing the hands-on experience for the previous cert. test

Ref. https://docs.aws.amazon.com/vpc/latest/userguide/default-vpc.html#create-default-vpc

SNS

  • [aws-workshop, workshop]
  • [ARN_FOR_CREATED_SNS_TOPIC]

Ref. https://docs.aws.amazon.com/sns/latest/dg/sns-getting-started.html

DynamoDB

  • aws-workshop-dynamodb id number

  • tags: [env: dev, user: SET_USER_HERE]

  • submit init data

aws dynamodb put-item --table-name aws-workshop-dynamodb --item file://aws-workshop-dynamodb-init-data.json --return-consumed-capacity TOTAL

Ref. https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-prereq.html

https://docs.aws.amazon.com/cli/latest/reference/dynamodb/put-item.html

S3

  • hello-aws-workshop-us-west-2-201901, us-west-2
  • tags: [env: dev, user: SET_USER_HERE]
  • folder: [data, log]

Aceess to data folder

https://s3.console.aws.amazon.com/s3/buckets/hello-aws-workshop-us-west-2-201901/data/?region=us-west-2&tab=overview

Ref. https://docs.aws.amazon.com/AmazonS3/latest/gsg/GetStartedWithS3.html

Data Pipeline

  • aws-workshop-data-pipeline
  • [s3://us-west-2-aws-workshop-s3-bucket/data/, aws-workshop-dynamodb, us-west-2, s3://us-west-2-aws-workshop-s3-bucket/log/]
  • tags: [env: dev, user: SET_USER_HERE]

https://github.com/aws-samples/data-pipeline-samples/blob/master/samples/DynamoDBImportCSV/CSVtoDynamoDB.json

Hive Script

DROP TABLE IF EXISTS tempHiveTable;
DROP TABLE IF EXISTS s3TempTable;

CREATE EXTERNAL TABLE tempHiveTable (#{myDDBColDefn})
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
TBLPROPERTIES ("dynamodb.table.name" = "#{myDDBTableName}", "dynamodb.column.mapping" = "#{myDDBTableColMapping}");

CREATE EXTERNAL TABLE s3TempTable (#{myS3ColMapping})
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '
' LOCATION '#{myInputS3Loc}'
tblproperties ("skip.header.line.count"="1");

INSERT OVERWRITE TABLE tempHiveTable SELECT * FROM s3TempTable;

Ref.

https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/EMRforDynamoDB.ExternalTableForDDB.html#EMRforDynamoDB.ExternalTableForDDB.DataTypes

https://stackoverflow.com/questions/15751999/hive-external-table-skip-first-row

EMR cluster

Ref.

https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-emrcluster.html

https://aws.amazon.com/emr/pricing/

clean up

Unsubscribe the notification

  • unsubscribe SNS notification to click the link in the email body to unsubscribe

Data pipelime

  • delete the data pilelime

dynamoDB for data pipeline

  • delete the dynamoDB

SNS topic for data pipeline

  • delete the topic for the SnsAlarm
  • delete the topic for dynamoDb alert

Ref. https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-importexport-ddb-prereq.html#dp-importexport-ddb-table-cli

{"Brand":{"S":"Brand-Company C"},"Price":{"N":"500"},"ProductCategory":{"S":"Bicycle"},"Title":{"S":"20-Bike-205"},"id":{"N":"1"},"BicycleType":{"S":"hybrid"}}
2 hybrid Brand-Company A 400 Bicycle 40-Bike-410
3 hybrid Brand-Company D 300 Bicycle 30-Bike-300
{
"objects": [
{
"myComment" : "Activity used to run the hive script to import CSV data",
"output": {
"ref": "DataNodeId_cnlSW"
},
"input": {
"ref": "DataNodeId_1ERqq"
},
"name": "TableRestoreActivity",
"hiveScript": "DROP TABLE IF EXISTS tempHiveTable;\n\nDROP TABLE IF EXISTS s3TempTable;\n\nCREATE EXTERNAL TABLE tempHiveTable (#{myDDBColDefn})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{myDDBTableName}\", \"dynamodb.column.mapping\" = \"#{myDDBTableColMapping}\");\n \nCREATE EXTERNAL TABLE s3TempTable (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{myInputS3Loc}' \nTBLPROPERTIES (\"skip.header.line.count\"=\"1\");\n \nINSERT OVERWRITE TABLE tempHiveTable SELECT * FROM s3TempTable;",
"id": "TableRestoreActivity",
"runsOn": { "ref" : "EmrClusterForRestore" },
"stage": "false",
"type": "HiveActivity"
},
{
"myComment" : "The DynamoDB table from which we need to import data from",
"dataFormat": {
"ref": "DDBExportFormat"
},
"name": "DynamoDB",
"id": "DataNodeId_1ERqq",
"type": "DynamoDBDataNode",
"tableName": "#{myDDBTableName}"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{myLogUri}",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"name": "EmrClusterForRestore",
"coreInstanceType": "m4.large",
"coreInstanceCount": "1",
"masterInstanceType": "m4.large",
"releaseLabel": "emr-5.20.0",
"id": "EmrClusterForRestore",
"type": "EmrCluster",
"terminateAfter": "20 Minutes"
},
{
"myComment" : "The S3 path from which we import data from",
"directoryPath": "#{myInputS3Loc}",
"dataFormat": {
"ref": "DataFormatId_xqWRk"
},
"name": "S3DataNode",
"id": "DataNodeId_cnlSW",
"type": "S3DataNode"
},
{
"myComment" : "Format for the S3 Path",
"name": "DefaultDataFormat1",
"column": "not_used STRING",
"id": "DataFormatId_xqWRk",
"type": "CSV"
},
{
"myComment" : "Format for the DynamoDB table",
"name": "DDBExportFormat",
"id": "DDBExportFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
}
],
"parameters": [
{
"description": "Input S3 folder",
"id": "myInputS3Loc",
"default": "s3://hello-aws-workshop-us-west-2-201901/data",
"type": "AWS::S3::ObjectKey"
},
{
"description": "DynamoDB table name",
"id": "myDDBTableName",
"type": "String"
},
{
"description": "S3 to DynamoDB Column Mapping",
"id": "myDDBTableColMapping",
"default" : "id:id,BicycleType:BicycleType,Brand:Brand,ProductCategory:ProductCategory,Price:Price,Title:Title",
"type": "String"
},
{
"description": "S3 Column Mappings",
"id": "myS3ColMapping",
"default" : "id BIGINT,BicycleType STRING,Brand STRING,Price DOUBLE,ProductCategory STRING,Title STRING",
"type": "String"
},
{
"description": "DynamoDB Column Mappings",
"id": "myDDBColDefn",
"default" : "id BIGINT,BicycleType STRING,Brand STRING,Price DOUBLE,ProductCategory STRING,Title STRING",
"type": "STRING"
},
{
"description": "DataPipeline Log Uri",
"id": "myLogUri",
"type": "AWS::S3::ObjectKey"
}
]
}
@pluto-atom-4
Copy link
Author

some alternated parameter

clientID:clientID,clientName:clientName,clientEmail:clientEmail,climate:climate,amenities:amenities,tripType:tripType,dates:dates

clientID BIGINT,clientName STRING,clientEmail STRING,climate STRING,amenities STRING,tripType STRING,dates STRING

hive script : additional statement to skip the column line in the csv file

 \nTBLPROPERTIES (\"skip.header.line.count\"=\"1\")

 TBLPROPERTIES ("skip.header.line.count"="1")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment