Skip to content

Instantly share code, notes, and snippets.

@iMilnb iMilnb/README.md
Last active Oct 26, 2019

Embed
What would you like to do?
AWS Terraform configuration: Stream CloudWatch Logs to ElasticSearch

Rationale

This snippet is a sample showing how to implement CloudWatch Logs streaming to ElasticSearch using terraform. I wrote this gist because I didn't found a clear, end-to-end example on how to achieve this task. In particular, I understood the resource "aws_lambda_permission" "cloudwatch_allow" part by reading a couple of bug reports plus this stackoverflow post.

The js file is actually the Lambda function automatically created by AWS when creating this pipeline through the web console. I only added a endpoint variable handling so it is configurable from terraform.

Usage

Create a cwl2eslambda.zip file containing cwl2es.js at the root level. Invoke terraform plan to check for basic errors, then terraform apply.

// v1.1.2
//
// this lambda is the one automatically created by AWS
// when creating a CWL to ES stream using the AWS Console.
// I just added the `endpoint` variable handling.
//
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
const ENV = process.env;
var endpoint = ENV.es_endpoint;
exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer(input.awslogs.data, 'base64');
// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) { context.fail(error); return; }
// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));
// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);
// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
return;
}
// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
}));
if (error) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
}
context.fail(JSON.stringify(error));
} else {
console.log('Success: ' + JSON.stringify(success));
context.succeed('Success');
}
});
});
};
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
}
var bulkRequestBody = '';
payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);
// index name format: cwl-YYYY.MM.DD
var indexName = [
'cwl-' + timestamp.getUTCFullYear(), // year
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2) // day
].join('.');
var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] = logEvent.id;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;
var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id = logEvent.id;
bulkRequestBody += [
JSON.stringify(action),
JSON.stringify(source),
].join('\n') + '\n';
});
return bulkRequestBody;
}
function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};
for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];
if (isNumeric(value)) {
source[key] = 1 * value;
continue;
}
jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
}
source[key] = value;
}
}
return source;
}
jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
}
return {};
}
function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
}
function isValidJson(message) {
try {
JSON.parse(message);
} catch (e) { return false; }
return true;
}
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
}
function post(body, callback) {
var requestParams = buildRequest(endpoint, body);
var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
});
response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
});
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
};
}
var error = response.statusCode !== 200 || info.errors === true ? {
"statusCode": response.statusCode,
"responseBody": responseBody
} : null;
callback(error, success, response.statusCode, failedItems);
});
}).on('error', function(e) {
callback(e);
});
request.end(requestParams.body);
}
function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');
var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
}
};
var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 :
1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
.join('\n');
var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
.sort()
.join(';');
var canonicalString = [
request.method,
request.path, '',
canonicalHeaders, '',
signedHeaders,
hash(request.body, 'hex'),
].join('\n');
var credentialString = [ date, region, service, 'aws4_request' ].join('/');
var stringToSign = [
'AWS4-HMAC-SHA256',
datetime,
credentialString,
hash(canonicalString, 'hex')
] .join('\n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');
return request;
}
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str,
'utf8').digest(encoding);
}
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
}
variable "es_endpoint" {
type = "string"
default = "elasticsearch.endpoint.es.amazonaws.com"
}
variable "cwl_endpoint" {
type = "string"
default = "logs.eu-central-1.amazonaws.com"
}
resource "aws_cloudwatch_log_group" "syslog-loggroup" {
name = "syslog"
retention_in_days = 14
}
resource "aws_iam_role" "lambda_elasticsearch_execution_role" {
name = "lambda_elasticsearch_execution_role"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Effect": "Allow"
}
]
}
EOF
}
resource "aws_iam_role_policy" "lambda_elasticsearch_execution_policy" {
name = "lambda_elasticsearch_execution_policy"
role = "${aws_iam_role.lambda_elasticsearch_execution_role.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": [
"arn:aws:logs:*:*:*"
]
},
{
"Effect": "Allow",
"Action": "es:ESHttpPost",
"Resource": "arn:aws:es:*:*:*"
}
]
}
EOF
}
resource "aws_lambda_function" "cwl_stream_lambda" {
filename = "cwl2eslambda.zip"
function_name = "LogsToElasticsearch"
role = "${aws_iam_role.lambda_elasticsearch_execution_role.arn}"
handler = "exports.handler"
source_code_hash = "${base64sha256(file("cwl2eslambda.zip"))}"
runtime = "nodejs4.3"
environment {
variables = {
es_endpoint = "${var.es_endpoint}"
}
}
}
resource "aws_lambda_permission" "cloudwatch_allow" {
statement_id = "cloudwatch_allow"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.cwl_stream_lambda.arn}"
principal = "${var.cwl_endpoint}"
source_arn = "${aws_cloudwatch_log_group.syslog-loggroup.arn}"
}
resource "aws_cloudwatch_log_subscription_filter" "cloudwatch_logs_to_es" {
depends_on = ["aws_lambda_permission.cloudwatch_allow"]
name = "cloudwatch_logs_to_elasticsearch"
log_group_name = "${aws_cloudwatch_log_group.syslog-loggroup.name}"
filter_pattern = ""
destination_arn = "${aws_lambda_function.cwl_stream_lambda.arn}"
}
@varsy

This comment has been minimized.

Copy link

varsy commented Oct 24, 2017

Thank you for your gist, much appreciated!

To make it work you need to change handler = "exports.handler" (line 66) to handler = "<your_lambda_file_name>.handler".

@gordonbondon

This comment has been minimized.

Copy link

gordonbondon commented Nov 22, 2017

Thanks! This one worked for me perfectly.

@putarpuar

This comment has been minimized.

Copy link

putarpuar commented Feb 20, 2018

Thanks. Is there the possibility to attach two cwl groups to one es endpoint? Or do I have to create for every log group one es instance?

@claaslisowski

This comment has been minimized.

Copy link

claaslisowski commented Oct 19, 2018

@putarpuar
probably to late for you but maybe helpful for others.

The answer to your question is: yes. you can get logs from multiple log groups to the same es.
The log groups however are not attached to es directly but to the lambda function which specifies the es endpoint to which it then sends the logs.

That can be also automated with another lambda function by auto subscribing new log groups (optionally with a specific pattern) to the es logs lambda function.

@kurron

This comment has been minimized.

Copy link

kurron commented Oct 25, 2018

Just wanted to drop you a note letting you know I appreciate you sharing the code. It helped kick start my own work. I'm using a VPC endpoint to access my Elasticsearch cluster so you have make some minor modifications to make it work. One is to expand the policy to allow for the creation of some network objects.

        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "ec2:CreateNetworkInterface",
                "ec2:DescribeNetworkInterfaces",
                "ec2:DeleteNetworkInterface"
            ],
            "Resource": "*"
        },

another is to use fill in the vpc_config block of the aws_lambda_function resource with your subnet and security group ids.

@ga-tb

This comment has been minimized.

Copy link

ga-tb commented Apr 2, 2019

Anybody using this tool and seeing the following error:

Rejecting mapping update to [cwl-2019.03.29] as the final mapping would have more than 1 type: [/aws/ecs/some-api, /aws/lambda/some-app]

Mapping types for new indexes are deprecated as of ES6.0, and will be removed altogether in 8.0.

To fix: change this line https://gist.github.com/iMilnb/27726a5004c0d4dc3dba3de01c65c575#file-cwl2es-js-L86

# Old
action.index._type = payload.logGroup;
# New
action.index._type = "_doc";

The value of logGroup is still stored using the @log_group attribute so hopefully no functionality is lost.

@ericgvt9

This comment has been minimized.

Copy link

ericgvt9 commented Sep 24, 2019

Thanks for this gist. Btw, those who are on Terraform 0.12.x and encounter following UTF-8 error, can consider the following updates:
Message:

Call to function "file" failed: contents of cwl2eslambda.zip
are not valid UTF-8; use the filebase64 function to obtain the Base64 encoded
contents or the other file functions (e.g. filemd5, filesha256) to obtain file
hashing results instead.

Updates:
data "archive_file" "cwl2eslambda" {
type = "zip"
source_file = "cwl2es.js"
output_path = "cwl2eslambda.zip"
}

resource "aws_lambda_function" "cwl_stream_lambda" {
filename = "cwl2eslambda.zip"
function_name = "LogsToElasticsearch"
role = "${aws_iam_role.lambda_elasticsearch_execution_role.arn}"
handler = "exports.handler"
source_code_hash = "${filebase64sha256(data.archive_file.cwl2eslambda.output_path)}"

runtime = "nodejs4.3"

environment {
variables = {
es_endpoint = "${var.es_endpoint}"
}
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.