Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
kinesys firehose by cloudwatch subscription to elasticsearch
/*
For processing data sent to Firehose by Cloudwatch Logs subscription filters.
Cloudwatch Logs sends to Firehose records that look like this:
{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "log_group_name",
"logStream": "log_stream_name",
"subscriptionFilters": [
"subscription_filter_name"
],
"logEvents": [
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208016,
"message": "log message 1"
},
{
"id": "01234567890123456789012345678901234567890123456789012345",
"timestamp": 1510109208017,
"message": "log message 2"
}
...
]
}
The data is additionally compressed with GZIP.
The code below will:
1) Gunzip the data
2) Parse the json
3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
processing error output. Such records do not contain any log events. You can modify the code to set the result to
Dropped instead to get rid of these records completely.
4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
transformations on the log events.
5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
method.
6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
*/
const zlib = require('zlib');
const AWS = require('aws-sdk');
/**
* logEvent has this format:
*
* {
* "id": "01234567890123456789012345678901234567890123456789012345",
* "timestamp": 1510109208016,
* "message": "log message 1"
* }
*
* The default implementation below just extracts the message and appends a newline to it.
*
* The result must be returned in a Promise.
*/
function transformLogEvent(logEvent) {
return `${logEvent.message}\n`
}
function putRecordsToFirehoseStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) {
client.putRecordBatch({
DeliveryStreamName: streamName,
Records: records,
}, (err, data) => {
const codes = [];
let failed = [];
let errMsg = err;
if (err) {
failed = records;
} else {
for (let i = 0; i < data.RequestResponses.length; i++) {
const code = data.RequestResponses[i].ErrorCode;
if (code) {
codes.push(code);
failed.push(records[i]);
}
}
errMsg = `Individual error codes: ${codes}`;
}
if (failed.length > 0) {
if (attemptsMade + 1 < maxAttempts) {
console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg);
putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
} else {
reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
}
} else {
resolve('');
}
});
}
function putRecordsToKinesisStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) {
client.putRecords({
StreamName: streamName,
Records: records,
}, (err, data) => {
const codes = [];
let failed = [];
let errMsg = err;
if (err) {
failed = records;
} else {
for (let i = 0; i < data.Records.length; i++) {
const code = data.Records[i].ErrorCode;
if (code) {
codes.push(code);
failed.push(records[i]);
}
}
errMsg = `Individual error codes: ${codes}`;
}
if (failed.length > 0) {
if (attemptsMade + 1 < maxAttempts) {
console.log('Some records failed while calling PutRecords, retrying. %s', errMsg);
putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
} else {
reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
}
} else {
resolve('');
}
});
}
function createReingestionRecord(isSas, originalRecord) {
if (isSas) {
return {
Data: Buffer.from(originalRecord.data, 'base64'),
PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey,
};
} else {
return {
Data: Buffer.from(originalRecord.data, 'base64'),
};
}
}
function getReingestionRecord(isSas, reIngestionRecord) {
if (isSas) {
return {
Data: reIngestionRecord.data,
PartitionKey: reIngestionRecord.original.kinesisRecordMetadata.partitionKey
};
} else {
return {
Data: reIngestionRecord.data,
};
}
}
exports.handler = (event, context, callback) => {
Promise.all(event.records.map(r => {
const buffer = Buffer.from(r.data, 'base64');
let decompressed;
try {
decompressed = zlib.gunzipSync(buffer);
} catch (e) {
return [{
recordId: r.recordId,
result: 'ProcessingFailed',
}];
}
const data = JSON.parse(decompressed);
// CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
// They do not contain actual data.
if (data.messageType === 'CONTROL_MESSAGE') {
return [{
recordId: r.recordId,
result: 'Dropped',
}];
} else if (data.messageType === 'DATA_MESSAGE') {
return data.logEvents.map((logEvent, i) => {
let message = "";
try{
const m = JSON.parse(logEvent.message)
if(!m.timestamp){
let date = new Date();
if(m.time){
date = new Date(m.time)
}
m.timestamp = date.toISOString();
}
message = JSON.stringify(m)
}catch(e){
console.log("parse error: "+ logEvent.message);
if(i === 0){
return { recordId: r.recordId, result: 'Dropped'}
}
return null;
}
const encoded = Buffer.from(message).toString('base64');
if(i === 0){
return {
recordId: r.recordId,
result: 'Ok',
data: encoded,
};
} else {
return {
result: 'Ok',
original: r,
data: encoded,
};
}
}).filter((r)=> r);
} else {
return [Promise.resolve({
recordId: r.recordId,
result: 'ProcessingFailed',
})];
}
})).then(recs => {
const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn');
const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn;
const region = streamARN.split(':')[3];
const streamName = streamARN.split('/')[1];
const firstRecords = []
const remains = []
recs.forEach((rec)=>{
rec.forEach((r)=>{
if(r.recordId){
firstRecords.push(r)
}else if(r.result === 'Ok'){
remains.push(r)
}
})
});
const result = { records: firstRecords};
let recordsToReingest = [];
const putRecordBatches = [];
let totalRecordsToBeReingested = 0;
for (let idx = 0; idx < remains.length; idx++) {
const rec = remains[idx];
totalRecordsToBeReingested++;
recordsToReingest.push(getReingestionRecord(isSas, rec));
if (recordsToReingest.length === 500) {
putRecordBatches.push(recordsToReingest);
recordsToReingest = [];
}
}
if (recordsToReingest.length > 0) {
// add the last batch
putRecordBatches.push(recordsToReingest);
}
if (putRecordBatches.length > 0) {
new Promise((resolve, reject) => {
let recordsReingestedSoFar = 0;
for (let idx = 0; idx < putRecordBatches.length; idx++) {
const recordBatch = putRecordBatches[idx];
if (isSas) {
const client = new AWS.Kinesis({ region: region });
putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20);
} else {
const client = new AWS.Firehose({ region: region });
putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20);
}
recordsReingestedSoFar += recordBatch.length;
console.log('Reingested %s/%s records in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, streamName);
}
}).then(
() => {
console.log('Reingested all %s records in to %s stream', totalRecordsToBeReingested, streamName);
callback(null, result);
},
failed => {
console.log('Failed to reingest records. %s', failed);
callback(failed, null);
});
} else {
console.log('No records needed to be reingested.');
callback(null, result);
}
}).catch(ex => {
console.log('Error: ', ex);
callback(ex, null);
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.