Skip to content

Instantly share code, notes, and snippets.

@tilfin
Last active May 15, 2017 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tilfin/6bb08a9ed5e693f89dbe6dec1617cb9f to your computer and use it in GitHub Desktop.
Save tilfin/6bb08a9ed5e693f89dbe6dec1617cb9f to your computer and use it in GitHub Desktop.
Kinesis Stream, Lambda, DynamoDB, S3 で Stream ベース実装に使える npm モジュール ref: http://qiita.com/tilfin/items/b4e71ec0b936f17e94dd
const es = require('event-stream');
const KSL = require('kinesis-stream-lambda');
exports.handler = function(event, context, callback) {
console.log('event: ', JSON.stringify(event, null, 2));
const result = [];
const stream = KSL.reader(event, { isAgg: false });
stream.on('end', function() {
console.dir(result);
callback(null, null);
});
stream.on('error', function(err) {
callback(err);
});
stream
.pipe(KSL.parseJSON({ expandArray: false }))
.pipe(es.map(function(data, callback) {
result.push(data);
callback(null, data)
}));
}
class DynamoWorkStream extends PacedWorkStream {
constructor() {
super({
concurrency: 10,
workMS: 80
});
}
_workPromise(data) {
return dp.proc(data, {
table: data.table,
useBatch: false
});
}
}
module.exports = DynamoWorkStream;
const PromisedLifestream = require('promised-lifestream');
exports.handle = function(event, context, callback) {
const workStream = new DynamoWorkStream();
PromisedLifestream([
KSL.reader(event, { isAgg: false }),
KSL.parseJSON({ expandArray: true }),
workStream
])
.then(() => {
callback(null, null);
})
.catch(err => {
callback(err); // 3つの stream どこかでエラーが起きてもここでキャッチできる
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment