Skip to content

Instantly share code, notes, and snippets.

@kangks
Created June 9, 2017 04:22
Show Gist options
  • Save kangks/adbfdc5cf64126b3465b99689bb64aef to your computer and use it in GitHub Desktop.
Save kangks/adbfdc5cf64126b3465b99689bb64aef to your computer and use it in GitHub Desktop.
Stack for S3 event -> Lambda transform from csv to json -> kinesis stream -> downstream Lambda for each row processing. These are the purpose of the demo: - Retry of downstream Lambda when failed. The record in Kinesis Stream will not be removed until success Runtime - With large CSV, the Lambda will parallel according to number of shards of Kin…
AWSTemplateFormatVersion: '2010-09-09'
Metadata:
Comment: "Stack for S3 event -> Lambda transform from csv to json -> kinesis stream -> downstream Lambda for each row processing.
These are the purpose of the demo:
- Retry of downstream Lambda when failed. The record in Kinesis Stream will not be removed until success Runtime
- With large CSV, the Lambda will parallel according to number of shards of Kinesis Stream"
Resources:
IAMRole:
Type: "AWS::IAM::Role"
Properties:
RoleName: "lambda-s3-kinesis"
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Policies:
-
PolicyName: "lambda-s3-kinesis"
PolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Action: "kinesis:*"
Resource: "*"
-
Effect: "Allow"
Action: ["s3:GetObject"]
Resource: "*"
-
Effect: "Allow"
Action: "logs:*"
Resource: "*"
-
Effect: "Allow"
Action: "xray:*"
Resource: "*"
CSVKinesisStream:
Type: "AWS::Kinesis::Stream"
Properties:
Name: "CSVStream"
ShardCount: 4
Tags:
-
Key: Project
Value: "CSV parsing POC"
CSVSlicer:
Type: "AWS::Lambda::Function"
Properties:
FunctionName: 'csvTransformer'
Handler: "index.handler"
Role: !GetAtt IAMRole.Arn
Environment:
Variables:
STREAM_NAME: !Ref CSVKinesisStream
Code:
ZipFile: >
'use strict';
//const awsXRay = require('aws-xray-sdk-core');
//const aws = awsXRay.captureAWS(require('aws-sdk'));
const aws = require('aws-sdk');
aws.config.region = process.env.AWS_DEFAULT_REGION;
const s3 = new aws.S3({ apiVersion: '2006-03-01' });
const lambda = new aws.Lambda({apiVersion: '2015-03-31'});
const kinesis = new aws.Kinesis({apiVersion:'2013-12-02'});
const streamName = process.env.STREAM_NAME;
class CSVParser{
constructor(options){
this.delimiter = options.delimiter
this.lineNumber = 0;
}
parseLines(lines, onDataCallback){
var header = [];
lines.map( (line) => {
var col = line.split(this.delimiter)
++this.lineNumber
if(this.lineNumber == 1){
header = col
}else{
var result = col.reduce( (map, curr, index) => {
map[header[index]] = curr
return map;
}, {rowNumber: this.lineNumber});
console.log(result);
onDataCallback(result);
}
});
}
parse(stream, onDataCallback, onCompleteCallback){
stream.on('data', (chunk) => {
var input = chunk.toString('UTF-8');
var lines = input.trim().replace(/\r/g,'\n').split("\n");
this.parseLines(lines, onDataCallback);
})
stream.on('end',()=>{
console.log("Read stream ended")
onCompleteCallback()
})
stream.on('closed',()=>{
console.log("Read stream closed")
})
}
}
function postData(data, onErrorCallback, onSuccessCallback){
kinesisPost(data, onErrorCallback, onSuccessCallback);
}
function kinesisPost(data,onErrorCallback, onSuccessCallback){
kinesis.putRecord(
{
Data: JSON.stringify(data),
PartitionKey: (data.rowNumber).toString(),
StreamName: streamName
},
function(err, response) {
if (err) {
console.log('err:', err);
onErrorCallback(err);
}
else
{
console.log('data:', data);
console.log('shard:', response);
onSuccessCallback(response);
}
}
);
}
function parseCSV(stream, onDataCallback, onCompleteCallback){
let csvParser = new CSVParser({delimiter:","});
csvParser.parse(stream, onDataCallback, onCompleteCallback);
}
exports.handler = (event, context, callback) => {
console.log('Received event:', JSON.stringify(event, null, 2));
// Get the object from the event and show its content type
const bucket = event.Records[0].s3.bucket.name;
const key = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const params = {
Bucket: bucket,
Key: key,
};
var s3Stream =
s3.getObject(params, (err, data) => {
if (err) {
console.log(err);
const message = `Error getting object ${key} from bucket ${bucket}. Make sure they exist and your bucket is in the same region as this function.`;
console.log(message);
callback(message);
} else {
console.log('CONTENT TYPE:', data.ContentType);
}
}).createReadStream();
var rowCount = 0;
parseCSV(s3Stream,
(data) => {
rowCount = rowCount + 1;
postData(data,
(error) => {
console.log("Error kinesis post: ", error);
},
(response) => {
console.log("Kinesis post: ", response);
}
);
},
() => {
callback(null, rowCount);
}
);
};
Runtime: "nodejs6.10"
Timeout: "5"
TracingConfig:
Mode:
Active
Tags:
-
Key: Project
Value: "CSV parsing POC"
ApiTest:
Type: "AWS::Lambda::Function"
Properties:
FunctionName: 'rowProcessor'
Handler: "index.handler"
Role: !GetAtt IAMRole.Arn
TracingConfig:
Mode:
Active
Code:
ZipFile: >
'use strict';
function getData(event, callback){
if('Records' in event){
event.Records.forEach((element)=>{
var data = new Buffer(element.kinesis.data, 'base64').toString('utf8');
callback(JSON.parse(data));
});
}
}
exports.handler = (event, context, callback) => {
console.log('Received event:', JSON.stringify(event, null, 2));
getData(event,(rowData)=>{
var rowNumber = rowData.rowNumber;
console.log("received: ", rowNumber);
});
setTimeout(() => {
var random = Math.random();
if(random>=0.5){
//success
console.log('success');
callback();
}else{
//failed
console.log('failed');
callback('failed')
}
}, 1000);
};
Runtime: "nodejs6.10"
Timeout: "5"
Tags:
-
Key: Project
Value: "CSV parsing POC"
KinesisLambdaEventMapping:
Type: "AWS::Lambda::EventSourceMapping"
Properties:
EventSourceArn: !GetAtt CSVKinesisStream.Arn
FunctionName: !GetAtt ApiTest.Arn
StartingPosition: "TRIM_HORIZON"
BatchSize: 1
Enabled: true
LambdaInvokePermission:
Type: "AWS::Lambda::Permission"
Properties:
FunctionName: !GetAtt CSVSlicer.Arn
Action: "lambda:InvokeFunction"
Principal: "s3.amazonaws.com"
SourceAccount:
Ref: "AWS::AccountId"
S3TestBucket:
Type: "AWS::S3::Bucket"
Properties:
BucketName: !Join ['-',[!Ref "AWS::AccountId", "csv-test-s3"]]
NotificationConfiguration:
LambdaConfigurations:
-
Function: !GetAtt CSVSlicer.Arn
Event: "s3:ObjectCreated:*"
Tags:
-
Key: Project
Value: "CSV parsing POC"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment