Skip to content

Instantly share code, notes, and snippets.

@ikrauchanka
Created March 24, 2017 22:26
Show Gist options
  • Save ikrauchanka/3187e524986c7d31d7d02253f92e5590 to your computer and use it in GitHub Desktop.
Save ikrauchanka/3187e524986c7d31d7d02253f92e5590 to your computer and use it in GitHub Desktop.
read AWS FlowLogs
/*
Program require access to S3 objects.
It will download gz file, gunzip it, read flat file and convert output into json.
You can use output as a data in HTTP and stream into elastic search(or ELK).
INFO: https://aws.amazon.com/blogs/aws/vpc-flow-logs-log-and-view-network-traffic-flows/
*/
package main
import (
"bufio"
"compress/gzip"
"encoding/json"
"fmt"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
)
const (
// bucketName AWS bucket name with flow-logs
bucketName = "my-bucket-with-flowlogs"
// prefixName prefix where gz files are located
prefixName = "/flow-logs/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx/eni-xxxxxxxx-all/000000.gz"
// TmpDir is a temporary directory
TmpDir = "/Users/ilja"
// DefaultDownloadConcurrency is the default number of goroutines to spin up when using Download().
DefaultDownloadConcurrency = 5
// DefaultDownloadPartSize is the default range of bytes to get at a time when using Download().
DefaultDownloadPartSize = 1024 * 1024 * 5
// DefaultSharedConfigProfile is a Default profile
DefaultSharedConfigProfile = `default`
// filed in file are separated by flowFileDelimeter
flowFileDelimeter = " "
)
type flowLogStructure struct {
Ttime string `json:"time"`
Tversion string `json:"version"`
TaccountID string `json:"account-id"`
TinterfaceID string `json:"interface-id"`
Tsrcaddr string `json:"srcaddr"`
Tdstaddr string `json:"dstaddr"`
Tsrcport string `json:"srcport"`
Tdstport string `json:"dstport"`
Tprotocol string `json:"protocol"`
Tpackets string `json:"packets"`
Tbytes string `json:"bytes"`
Tstart string `json:"start"`
Tend string `json:"end"`
Taction string `json:"action"`
TlogStatus string `json:"log-status"`
}
func connectToAWS() *session.Session {
config := aws.Config{Region: aws.String("us-east-1")}
sess := session.Must(session.NewSession(&config))
if sess == nil {
fmt.Println("problems with connection to AWS")
}
return sess
}
func getCloadWatchFile2JSON(bucket string, prefix string, svc *s3.S3) (map[int]string, error) {
params := &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(prefix),
}
resp, _ := svc.GetObject(params)
reader, _ := gzip.NewReader(resp.Body)
defer reader.Close()
r := bufio.NewReader(reader)
scanner := bufio.NewScanner(r)
counter := 0
res := make(map[int]string)
for scanner.Scan() {
row := scanner.Text()
l := flowLogStructure{
Ttime: strings.Split(row, flowFileDelimeter)[0],
Tversion: strings.Split(row, flowFileDelimeter)[1],
TaccountID: strings.Split(row, flowFileDelimeter)[2],
TinterfaceID: strings.Split(row, flowFileDelimeter)[3],
Tsrcaddr: strings.Split(row, flowFileDelimeter)[4],
Tdstaddr: strings.Split(row, flowFileDelimeter)[5],
Tsrcport: strings.Split(row, flowFileDelimeter)[6],
Tdstport: strings.Split(row, flowFileDelimeter)[7],
Tprotocol: strings.Split(row, flowFileDelimeter)[8],
Tpackets: strings.Split(row, flowFileDelimeter)[9],
Tbytes: strings.Split(row, flowFileDelimeter)[10],
Tstart: strings.Split(row, flowFileDelimeter)[11],
Tend: strings.Split(row, flowFileDelimeter)[12],
Taction: strings.Split(row, flowFileDelimeter)[13],
TlogStatus: strings.Split(row, flowFileDelimeter)[14],
}
b, _ := json.Marshal(l)
s := string(b)
res[counter] = s
counter = counter + 1
}
return res, nil
}
func main() {
svc := s3.New(connectToAWS())
data, _ := getCloadWatchFile2JSON(bucketName, prefixName, svc)
for _, value := range data {
fmt.Println(value)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment