Last active January 18, 2024 08:08
AWS Terraform configuration: Stream CloudWatch Logs to ElasticSearch


This snippet is a sample showing how to implement CloudWatch Logs streaming to ElasticSearch using terraform. I wrote this gist because I didn't found a clear, end-to-end example on how to achieve this task. In particular, I understood the resource "aws_lambda_permission" "cloudwatch_allow" part by reading a couple of bug reports plus this stackoverflow post.

The js file is actually the Lambda function automatically created by AWS when creating this pipeline through the web console. I only added a endpoint variable handling so it is configurable from terraform.


Create a file containing cwl2es.js at the root level. Invoke terraform plan to check for basic errors, then terraform apply.

// v1.1.2
// this lambda is the one automatically created by AWS
// when creating a CWL to ES stream using the AWS Console.
// I just added the `endpoint` variable handling.
var https = require('https');
var zlib = require('zlib');
var crypto = require('crypto');
const ENV = process.env;
var endpoint = ENV.es_endpoint;
exports.handler = function(input, context) {
// decode input from base64
var zippedInput = new Buffer(, 'base64');
// decompress the input
zlib.gunzip(zippedInput, function(error, buffer) {
if (error) {; return; }
// parse the input from JSON
var awslogsData = JSON.parse(buffer.toString('utf8'));
// transform the input to Elasticsearch documents
var elasticsearchBulkData = transform(awslogsData);
// skip control messages
if (!elasticsearchBulkData) {
console.log('Received a control message');
context.succeed('Control message handled successfully');
// post documents to the Amazon Elasticsearch Service
post(elasticsearchBulkData, function(error, success, statusCode, failedItems) {
console.log('Response: ' + JSON.stringify({
"statusCode": statusCode
if (error) {
console.log('Error: ' + JSON.stringify(error, null, 2));
if (failedItems && failedItems.length > 0) {
console.log("Failed Items: " +
JSON.stringify(failedItems, null, 2));
} else {
console.log('Success: ' + JSON.stringify(success));
function transform(payload) {
if (payload.messageType === 'CONTROL_MESSAGE') {
return null;
var bulkRequestBody = '';
payload.logEvents.forEach(function(logEvent) {
var timestamp = new Date(1 * logEvent.timestamp);
// index name format: cwl-YYYY.MM.DD
var indexName = [
'cwl-' + timestamp.getUTCFullYear(), // year
('0' + (timestamp.getUTCMonth() + 1)).slice(-2), // month
('0' + timestamp.getUTCDate()).slice(-2) // day
var source = buildSource(logEvent.message, logEvent.extractedFields);
source['@id'] =;
source['@timestamp'] = new Date(1 * logEvent.timestamp).toISOString();
source['@message'] = logEvent.message;
source['@owner'] = payload.owner;
source['@log_group'] = payload.logGroup;
source['@log_stream'] = payload.logStream;
var action = { "index": {} };
action.index._index = indexName;
action.index._type = payload.logGroup;
action.index._id =;
bulkRequestBody += [
].join('\n') + '\n';
return bulkRequestBody;
function buildSource(message, extractedFields) {
if (extractedFields) {
var source = {};
for (var key in extractedFields) {
if (extractedFields.hasOwnProperty(key) && extractedFields[key]) {
var value = extractedFields[key];
if (isNumeric(value)) {
source[key] = 1 * value;
jsonSubString = extractJson(value);
if (jsonSubString !== null) {
source['$' + key] = JSON.parse(jsonSubString);
source[key] = value;
return source;
jsonSubString = extractJson(message);
if (jsonSubString !== null) {
return JSON.parse(jsonSubString);
return {};
function extractJson(message) {
var jsonStart = message.indexOf('{');
if (jsonStart < 0) return null;
var jsonSubString = message.substring(jsonStart);
return isValidJson(jsonSubString) ? jsonSubString : null;
function isValidJson(message) {
try {
} catch (e) { return false; }
return true;
function isNumeric(n) {
return !isNaN(parseFloat(n)) && isFinite(n);
function post(body, callback) {
var requestParams = buildRequest(endpoint, body);
var request = https.request(requestParams, function(response) {
var responseBody = '';
response.on('data', function(chunk) {
responseBody += chunk;
response.on('end', function() {
var info = JSON.parse(responseBody);
var failedItems;
var success;
if (response.statusCode >= 200 && response.statusCode < 299) {
failedItems = info.items.filter(function(x) {
return x.index.status >= 300;
success = {
"attemptedItems": info.items.length,
"successfulItems": info.items.length - failedItems.length,
"failedItems": failedItems.length
var error = response.statusCode !== 200 || info.errors === true ? {
"statusCode": response.statusCode,
"responseBody": responseBody
} : null;
callback(error, success, response.statusCode, failedItems);
}).on('error', function(e) {
function buildRequest(endpoint, body) {
var endpointParts = endpoint.match(/^([^\.]+)\.?([^\.]*)\.?([^\.]*)\.amazonaws\.com$/);
var region = endpointParts[2];
var service = endpointParts[3];
var datetime = (new Date()).toISOString().replace(/[:\-]|\.\d{3}/g, '');
var date = datetime.substr(0, 8);
var kDate = hmac('AWS4' + process.env.AWS_SECRET_ACCESS_KEY, date);
var kRegion = hmac(kDate, region);
var kService = hmac(kRegion, service);
var kSigning = hmac(kService, 'aws4_request');
var request = {
host: endpoint,
method: 'POST',
path: '/_bulk',
body: body,
headers: {
'Content-Type': 'application/json',
'Host': endpoint,
'Content-Length': Buffer.byteLength(body),
'X-Amz-Security-Token': process.env.AWS_SESSION_TOKEN,
'X-Amz-Date': datetime
var canonicalHeaders = Object.keys(request.headers)
.sort(function(a, b) { return a.toLowerCase() < b.toLowerCase() ? -1 :
1; })
.map(function(k) { return k.toLowerCase() + ':' + request.headers[k]; })
var signedHeaders = Object.keys(request.headers)
.map(function(k) { return k.toLowerCase(); })
var canonicalString = [
request.path, '',
canonicalHeaders, '',
hash(request.body, 'hex'),
var credentialString = [ date, region, service, 'aws4_request' ].join('/');
var stringToSign = [
hash(canonicalString, 'hex')
] .join('\n');
request.headers.Authorization = [
'AWS4-HMAC-SHA256 Credential=' + process.env.AWS_ACCESS_KEY_ID + '/' + credentialString,
'SignedHeaders=' + signedHeaders,
'Signature=' + hmac(kSigning, stringToSign, 'hex')
].join(', ');
return request;
function hmac(key, str, encoding) {
return crypto.createHmac('sha256', key).update(str,
function hash(str, encoding) {
return crypto.createHash('sha256').update(str, 'utf8').digest(encoding);
variable "es_endpoint" {
type = "string"
default = ""
variable "cwl_endpoint" {
type = "string"
default = ""
resource "aws_cloudwatch_log_group" "syslog-loggroup" {
name = "syslog"
retention_in_days = 14
resource "aws_iam_role" "lambda_elasticsearch_execution_role" {
name = "lambda_elasticsearch_execution_role"
assume_role_policy = <<EOF
"Version": "2012-10-17",
"Statement": [
"Action": "sts:AssumeRole",
"Principal": {
"Service": ""
"Effect": "Allow"
resource "aws_iam_role_policy" "lambda_elasticsearch_execution_policy" {
name = "lambda_elasticsearch_execution_policy"
role = "${}"
policy = <<EOF
"Version": "2012-10-17",
"Statement": [
"Effect": "Allow",
"Action": [
"Resource": [
"Effect": "Allow",
"Action": "es:ESHttpPost",
"Resource": "arn:aws:es:*:*:*"
resource "aws_lambda_function" "cwl_stream_lambda" {
filename = ""
function_name = "LogsToElasticsearch"
role = "${aws_iam_role.lambda_elasticsearch_execution_role.arn}"
handler = "exports.handler"
source_code_hash = "${base64sha256(file(""))}"
runtime = "nodejs4.3"
environment {
variables = {
es_endpoint = "${var.es_endpoint}"
resource "aws_lambda_permission" "cloudwatch_allow" {
statement_id = "cloudwatch_allow"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.cwl_stream_lambda.arn}"
principal = "${var.cwl_endpoint}"
source_arn = "${aws_cloudwatch_log_group.syslog-loggroup.arn}"
resource "aws_cloudwatch_log_subscription_filter" "cloudwatch_logs_to_es" {
depends_on = ["aws_lambda_permission.cloudwatch_allow"]
name = "cloudwatch_logs_to_elasticsearch"
log_group_name = "${}"
filter_pattern = ""
destination_arn = "${aws_lambda_function.cwl_stream_lambda.arn}"
Running this in or after May 2020? Still works, but one caveat. The JS script is very outdated, but the easy solution is to simply walk through the web console build process [1], then once the new Lambda is automatically deployed, copy and paste that into your own script. Follow the advice above and you should have a fully-functioning deployment in no time!

Copy link

cwl_endpoint what is this variable what values should be provided for it ?? Can somebody help

Copy link

@raj-yadav9 from what AWS region are your CloudWatchLogs originating? In this below example, they are originating from the eu-central-1 region. Hope that makes sense?

variable "cwl_endpoint" {
type = "string"
default = ""

Copy link

for sending logs to ES I am getting this error. Can anyone help with this ??
"errorType": "TypeError",
"errorMessage": "Cannot read property 'data' of undefined",
"trace": [
"TypeError: Cannot read property 'data' of undefined",
" at Runtime.exports.handler (/var/task/cwl2es.js:17:48)",
" at Runtime.handleOnce (/var/runtime/Runtime.js:66:25)"

