Skip to content

Instantly share code, notes, and snippets.

@mlapida
Last active October 3, 2019 01:01
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save mlapida/e01eedf7946d20b2b822 to your computer and use it in GitHub Desktop.
Save mlapida/e01eedf7946d20b2b822 to your computer and use it in GitHub Desktop.
A short Lambda Function the can be sent CloudWatch Logs (in the case Flow Logs) and send them to Kinesis Firehose for storage in S3. A full writeup can be found on my site http://mlapida.com/thoughts/exporting-cloudwatch-logs-to-s3-lambda
import boto3
import logging
import json
import gzip
from StringIO import StringIO
logger = logging.getLogger()
logger.setLevel(logging.INFO)
client = boto3.client('firehose')
def lambda_handler(event, context):
#capture the CloudWatch log data
outEvent = str(event['awslogs']['data'])
#decode and unzip the log data
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read()
#convert the log data from JSON into a dictionary
cleanEvent = json.loads(outEvent)
#initiate a list
s = []
#set the name of the Kinesis Firehose Stream
firehoseName = 'FlowLogTest'
#loop through the events line by line
for t in cleanEvent['logEvents']:
#Transform the data and store it in the "Data" field.
p={
#Fields in FlowLogs - [version, accountid, interfaceid, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, stop, action, logstatus]
'Data': str(t['extractedFields']['start']) + "," + str(t['extractedFields']['dstaddr']) + "," + str(t['extractedFields']['srcaddr']) + "," + str(t['extractedFields']['packets'])+"\n"
}
#write the data to our list
s.insert(len(s),p)
#limit of 500 records per batch. Break it up if you have to.
if len(s) > 499:
#send the response to Firehose in bulk
SendToFireHose(firehoseName, s)
#Empty the list
s = []
#when done, send the response to Firehose in bulk
if len(s) > 0:
SendToFireHose(firehoseName, s)
#function to send record to Kinesis Firehose
def SendToFireHose(streamName, records):
response = client.put_record_batch(
DeliveryStreamName = streamName,
Records=records
)
#log the number of data points written to Kinesis
print "Wrote the following records to Firehose: " + str(len(records))
@ShadySQL
Copy link

ShadySQL commented Feb 9, 2017

Would be nice if you could include all the fields on line 35. You cover only 4 but the logs are more verbose than that. I wouldn't know how to to make sense of these as a newbie. Thanks for the write up.

@mbreitung
Copy link

Thanks @mlapida for the example. I left everything as a json for the logs since logstash and splunk know how to parse json log lines
s.append({'Data': "{d}\n".format(d=json.dumps(t['extractedFields']))})

@dishmael
Copy link

dishmael commented Jun 2, 2017

this was extremely helpful for figuring out how to properly parse the CloudWatch flow log, thanks!

@mochowdhury
Copy link

mochowdhury commented Nov 18, 2017

@mlapida - Thank you for this example. I am trying to demo this but running into some issues. Namely, I am getting the following error and given I'm a noob with Python, I can't seem to figure out how to go about resolve this error condition.

I'm trying to use this example not for VPC Flow logs, but I'm streaming Java application logs from Beanstalk to CloudWatch and from there, I'm streaming the CloudWatch log group to the Lambda function via Stream Subscription.

Was hoping someone could help me understand the issue and point me in the right direction.


initial_value must be unicode or None, not str: TypeError
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 18, in lambda_handler
outEvent = gzip.GzipFile(fileobj=StringIO(outEvent.decode('base64','strict'))).read()
TypeError: initial_value must be unicode or None, not str

@cshanes
Copy link

cshanes commented Nov 2, 2018

For anyone who wants to use this snippet with Python 3, the following decoding and unzipping code worked for me:

    # capture the CloudWatch log data
    out_event = str(event['awslogs']['data'])

    # decode and unzip the log data
    decoded = base64.b64decode(out_event)
    file = io.BytesIO(decoded)
    out_event = gzip.GzipFile(fileobj=file).read()

    # convert the log data from JSON into a dictionary
    clean_event = json.loads(out_event)

@sanudatta11
Copy link

@cshanes please share the final code

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment