Skip to content

Instantly share code, notes, and snippets.

@JCotton1123
Created Mar 4, 2020
Embed
What would you like to do?
CloudTrail Log Decompression Lambda
package main
import (
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pkg/errors"
"compress/gzip"
"fmt"
"io"
"log"
"os"
"path"
"strings"
)
// Handler receives and digests S3Events
func Handler(s3Event events.S3Event) error {
awsRegion := os.Getenv("AWS_REGION")
if awsRegion == "" {
awsRegion = "us-east-1"
}
sseAlgo := os.Getenv("SERVER_SIDE_ENCRYPTION")
if sseAlgo == "" {
sseAlgo = "AES256"
}
delSrcLog := os.Getenv("DELETE_SOURCE_LOG") == "true"
delTempFiles := os.Getenv("DELETE_TEMP_FILES") != "false"
sess, err := session.NewSession(&aws.Config{Region: aws.String(awsRegion)})
if err != nil {
return errors.Wrap(err, "Failed to create AWS session")
}
svc := s3.New(sess)
downloader := s3manager.NewDownloader(sess)
uploader := s3manager.NewUploader(sess)
processRecord := func(record events.S3EventRecord) error {
bucket := record.S3.Bucket.Name
srcKey := record.S3.Object.Key
srcUrl := "s3://" + bucket + "/" + srcKey
if !strings.HasSuffix(srcKey, "json.gz") {
return fmt.Errorf("Event filter misconfigured, skipping non-log object: %s\n", srcUrl)
}
log.Printf("Processing log: %s\n", srcUrl)
dstKey := strings.Replace(srcKey, ".gz", "", 1)
dstUrl := "s3://" + bucket + "/" + dstKey
compFilePath := "/tmp/" + path.Base(srcKey)
uncompFilePath := "/tmp/" + path.Base(dstKey)
compFile, err := os.Create(compFilePath)
if err != nil {
return errors.Wrap(err, "Unable to create temporary file")
}
defer compFile.Close()
if delTempFiles {
defer os.Remove(compFile.Name())
}
_, err = downloader.Download(
compFile,
&s3.GetObjectInput{
Bucket: &bucket,
Key: &srcKey,
},
)
if err != nil {
return errors.Wrap(err, "Failed to download log")
}
compFile.Seek(0, io.SeekStart)
uncompFile, err := os.Create(uncompFilePath)
if err != nil {
return errors.Wrap(err, "Failed to create temporary file")
}
defer uncompFile.Close()
if delTempFiles {
defer os.Remove(uncompFile.Name())
}
gzipReader, err := gzip.NewReader(compFile)
if err != nil {
return errors.Wrap(err, "Failed to create gzip reader")
}
defer gzipReader.Close()
_, err = io.Copy(uncompFile, gzipReader)
if err != nil {
return errors.Wrap(err, "Failed to decompress log")
}
uncompFile.Seek(0, io.SeekStart)
_, err = uploader.Upload(
&s3manager.UploadInput{
Bucket: &bucket,
Key: &dstKey,
Body: uncompFile,
ServerSideEncryption: &sseAlgo,
},
)
if err != nil {
return errors.Wrap(err, "Failed to upload log")
}
if delSrcLog {
log.Printf("Deleting source log file: %s\n", srcUrl)
_, err = svc.DeleteObject(
&s3.DeleteObjectInput{
Bucket: &bucket,
Key: &srcKey,
},
)
if err != nil {
return errors.Wrap(err, "Failed to delete original log")
}
}
log.Printf("Successfully uploaded decompressed log: %s\n", dstUrl)
return nil
}
errs := []error{}
for _, record := range s3Event.Records {
err := processRecord(record)
if err != nil {
errs = append(errs, err)
log.Printf("Error: %s", err)
}
}
if len(errs) > 0 {
return fmt.Errorf("Encountered %v errors during this execution", len(errs))
}
return nil
}
func main() {
lambda.Start(Handler)
}
package main
import (
"github.com/aws/aws-lambda-go/events"
"encoding/json"
"os"
"strings"
"testing"
)
const S3_EVENT = `
{
"Records": [
{
"eventVersion": "2.0",
"eventSource": "aws:s3",
"awsRegion": "us-east-1",
"eventTime": "1970-01-01T00:00:00.000Z",
"eventName": "ObjectCreated:Put",
"userIdentity": {
"principalId": "EXAMPLE"
},
"requestParameters": {
"sourceIPAddress": "127.0.0.1"
},
"responseElements": {
"x-amz-request-id": "EXAMPLE123456789",
"x-amz-id-2": "EXAMPLE123/5678abcdefghijklambdaisawesome/mnopqrstuvwxyzABCDEFGH"
},
"s3": {
"s3SchemaVersion": "1.0",
"configurationId": "testConfigRule",
"bucket": {
"name": "{{bucket}}",
"ownerIdentity": {
"principalId": "EXAMPLE"
},
"arn": "arn:aws:s3:::{{bucket}}"
},
"object": {
"key": "{{object}}",
"size": 1024,
"eTag": "0123456789abcdef0123456789abcdef",
"sequencer": "0A1B2C3D4E5F678901"
}
}
}
]
}
`
func Test(t *testing.T) {
testBucket := os.Getenv("TEST_BUCKET")
if testBucket == "" {
t.Fatalf("TEST_BUCKET environment variable is not defined")
}
testLog := os.Getenv("TEST_LOG")
if testLog == "" {
t.Fatalf("TEST_LOG environment variable is not defined")
}
testEventJSON := S3_EVENT
testEventJSON = strings.ReplaceAll(testEventJSON, "{{bucket}}", testBucket)
testEventJSON = strings.ReplaceAll(testEventJSON, "{{object}}", testLog)
testEvent := events.S3Event{}
err := json.Unmarshal([]byte(testEventJSON), &testEvent)
if err != nil {
t.Fatalf("Failed to encode event: %s", err)
}
err = Handler(testEvent)
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment