Created
June 9, 2017 04:22
-
-
Save kangks/adbfdc5cf64126b3465b99689bb64aef to your computer and use it in GitHub Desktop.
Stack for S3 event -> Lambda transform from csv to json -> kinesis stream -> downstream Lambda for each row processing. These are the purpose of the demo: - Retry of downstream Lambda when failed. The record in Kinesis Stream will not be removed until success Runtime - With large CSV, the Lambda will parallel according to number of shards of Kin…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
AWSTemplateFormatVersion: '2010-09-09' | |
Metadata: | |
Comment: "Stack for S3 event -> Lambda transform from csv to json -> kinesis stream -> downstream Lambda for each row processing. | |
These are the purpose of the demo: | |
- Retry of downstream Lambda when failed. The record in Kinesis Stream will not be removed until success Runtime | |
- With large CSV, the Lambda will parallel according to number of shards of Kinesis Stream" | |
Resources: | |
IAMRole: | |
Type: "AWS::IAM::Role" | |
Properties: | |
RoleName: "lambda-s3-kinesis" | |
AssumeRolePolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- | |
Effect: "Allow" | |
Principal: | |
Service: | |
- "lambda.amazonaws.com" | |
Action: | |
- "sts:AssumeRole" | |
Policies: | |
- | |
PolicyName: "lambda-s3-kinesis" | |
PolicyDocument: | |
Version: "2012-10-17" | |
Statement: | |
- | |
Effect: "Allow" | |
Action: "kinesis:*" | |
Resource: "*" | |
- | |
Effect: "Allow" | |
Action: ["s3:GetObject"] | |
Resource: "*" | |
- | |
Effect: "Allow" | |
Action: "logs:*" | |
Resource: "*" | |
- | |
Effect: "Allow" | |
Action: "xray:*" | |
Resource: "*" | |
CSVKinesisStream: | |
Type: "AWS::Kinesis::Stream" | |
Properties: | |
Name: "CSVStream" | |
ShardCount: 4 | |
Tags: | |
- | |
Key: Project | |
Value: "CSV parsing POC" | |
CSVSlicer: | |
Type: "AWS::Lambda::Function" | |
Properties: | |
FunctionName: 'csvTransformer' | |
Handler: "index.handler" | |
Role: !GetAtt IAMRole.Arn | |
Environment: | |
Variables: | |
STREAM_NAME: !Ref CSVKinesisStream | |
Code: | |
ZipFile: > | |
'use strict'; | |
//const awsXRay = require('aws-xray-sdk-core'); | |
//const aws = awsXRay.captureAWS(require('aws-sdk')); | |
const aws = require('aws-sdk'); | |
aws.config.region = process.env.AWS_DEFAULT_REGION; | |
const s3 = new aws.S3({ apiVersion: '2006-03-01' }); | |
const lambda = new aws.Lambda({apiVersion: '2015-03-31'}); | |
const kinesis = new aws.Kinesis({apiVersion:'2013-12-02'}); | |
const streamName = process.env.STREAM_NAME; | |
class CSVParser{ | |
constructor(options){ | |
this.delimiter = options.delimiter | |
this.lineNumber = 0; | |
} | |
parseLines(lines, onDataCallback){ | |
var header = []; | |
lines.map( (line) => { | |
var col = line.split(this.delimiter) | |
++this.lineNumber | |
if(this.lineNumber == 1){ | |
header = col | |
}else{ | |
var result = col.reduce( (map, curr, index) => { | |
map[header[index]] = curr | |
return map; | |
}, {rowNumber: this.lineNumber}); | |
console.log(result); | |
onDataCallback(result); | |
} | |
}); | |
} | |
parse(stream, onDataCallback, onCompleteCallback){ | |
stream.on('data', (chunk) => { | |
var input = chunk.toString('UTF-8'); | |
var lines = input.trim().replace(/\r/g,'\n').split("\n"); | |
this.parseLines(lines, onDataCallback); | |
}) | |
stream.on('end',()=>{ | |
console.log("Read stream ended") | |
onCompleteCallback() | |
}) | |
stream.on('closed',()=>{ | |
console.log("Read stream closed") | |
}) | |
} | |
} | |
function postData(data, onErrorCallback, onSuccessCallback){ | |
kinesisPost(data, onErrorCallback, onSuccessCallback); | |
} | |
function kinesisPost(data,onErrorCallback, onSuccessCallback){ | |
kinesis.putRecord( | |
{ | |
Data: JSON.stringify(data), | |
PartitionKey: (data.rowNumber).toString(), | |
StreamName: streamName | |
}, | |
function(err, response) { | |
if (err) { | |
console.log('err:', err); | |
onErrorCallback(err); | |
} | |
else | |
{ | |
console.log('data:', data); | |
console.log('shard:', response); | |
onSuccessCallback(response); | |
} | |
} | |
); | |
} | |
function parseCSV(stream, onDataCallback, onCompleteCallback){ | |
let csvParser = new CSVParser({delimiter:","}); | |
csvParser.parse(stream, onDataCallback, onCompleteCallback); | |
} | |
exports.handler = (event, context, callback) => { | |
console.log('Received event:', JSON.stringify(event, null, 2)); | |
// Get the object from the event and show its content type | |
const bucket = event.Records[0].s3.bucket.name; | |
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' ')); | |
const params = { | |
Bucket: bucket, | |
Key: key, | |
}; | |
var s3Stream = | |
s3.getObject(params, (err, data) => { | |
if (err) { | |
console.log(err); | |
const message = `Error getting object ${key} from bucket ${bucket}. Make sure they exist and your bucket is in the same region as this function.`; | |
console.log(message); | |
callback(message); | |
} else { | |
console.log('CONTENT TYPE:', data.ContentType); | |
} | |
}).createReadStream(); | |
var rowCount = 0; | |
parseCSV(s3Stream, | |
(data) => { | |
rowCount = rowCount + 1; | |
postData(data, | |
(error) => { | |
console.log("Error kinesis post: ", error); | |
}, | |
(response) => { | |
console.log("Kinesis post: ", response); | |
} | |
); | |
}, | |
() => { | |
callback(null, rowCount); | |
} | |
); | |
}; | |
Runtime: "nodejs6.10" | |
Timeout: "5" | |
TracingConfig: | |
Mode: | |
Active | |
Tags: | |
- | |
Key: Project | |
Value: "CSV parsing POC" | |
ApiTest: | |
Type: "AWS::Lambda::Function" | |
Properties: | |
FunctionName: 'rowProcessor' | |
Handler: "index.handler" | |
Role: !GetAtt IAMRole.Arn | |
TracingConfig: | |
Mode: | |
Active | |
Code: | |
ZipFile: > | |
'use strict'; | |
function getData(event, callback){ | |
if('Records' in event){ | |
event.Records.forEach((element)=>{ | |
var data = new Buffer(element.kinesis.data, 'base64').toString('utf8'); | |
callback(JSON.parse(data)); | |
}); | |
} | |
} | |
exports.handler = (event, context, callback) => { | |
console.log('Received event:', JSON.stringify(event, null, 2)); | |
getData(event,(rowData)=>{ | |
var rowNumber = rowData.rowNumber; | |
console.log("received: ", rowNumber); | |
}); | |
setTimeout(() => { | |
var random = Math.random(); | |
if(random>=0.5){ | |
//success | |
console.log('success'); | |
callback(); | |
}else{ | |
//failed | |
console.log('failed'); | |
callback('failed') | |
} | |
}, 1000); | |
}; | |
Runtime: "nodejs6.10" | |
Timeout: "5" | |
Tags: | |
- | |
Key: Project | |
Value: "CSV parsing POC" | |
KinesisLambdaEventMapping: | |
Type: "AWS::Lambda::EventSourceMapping" | |
Properties: | |
EventSourceArn: !GetAtt CSVKinesisStream.Arn | |
FunctionName: !GetAtt ApiTest.Arn | |
StartingPosition: "TRIM_HORIZON" | |
BatchSize: 1 | |
Enabled: true | |
LambdaInvokePermission: | |
Type: "AWS::Lambda::Permission" | |
Properties: | |
FunctionName: !GetAtt CSVSlicer.Arn | |
Action: "lambda:InvokeFunction" | |
Principal: "s3.amazonaws.com" | |
SourceAccount: | |
Ref: "AWS::AccountId" | |
S3TestBucket: | |
Type: "AWS::S3::Bucket" | |
Properties: | |
BucketName: !Join ['-',[!Ref "AWS::AccountId", "csv-test-s3"]] | |
NotificationConfiguration: | |
LambdaConfigurations: | |
- | |
Function: !GetAtt CSVSlicer.Arn | |
Event: "s3:ObjectCreated:*" | |
Tags: | |
- | |
Key: Project | |
Value: "CSV parsing POC" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment