Skip to content

Instantly share code, notes, and snippets.

@tfutada
Created September 17, 2016 10:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tfutada/99124a5c0d8ae178e808ac03b455e516 to your computer and use it in GitHub Desktop.
Save tfutada/99124a5c0d8ae178e808ac03b455e516 to your computer and use it in GitHub Desktop.
package main
import (
"strings"
"fmt"
"log"
"net/http"
"io/ioutil"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/elasticsearchservice"
apex "github.com/apex/go-apex"
apexS3 "github.com/apex/go-apex/s3"
"gopkg.in/olivere/elastic.v3"
"github.com/edoardo849/apex-aws-signer"
)
// Here is the basic concept of this code.
// 1. receives a S3 event, which contains a S3 bucket name and a file name, that is an access log.
// 2. loops records in the S3 event, while
// 3. getting a S3 object with a given file name.
// 4. nested-loops the obtained file, while
// 5. parsing lines, each of which represents a single ELB access log.
// 6. putting it into Elasticsearch as a Document.
const (
esUrl = "https://XXX.ap-northeast-1.es.amazonaws.com"
esIndex = "elblogidx"
esType = "elb"
)
type ElbAccessLog struct {
Timestamp string `json:"timestamp"`
Elb string `json:"elb"`
ClientIpAddress string `json:"client_ip_address"`
BackendIpAddress string `json:"backend_ip_address"`
RequestProcessingTime string `json:"request_processing_time"`
BackendProcessingTime string `json:"backend_processing_time"`
ResponseProcessingTime string `json:"response_processing_time"`
ElbStatusCode string `json:"elb_status_code"`
BackendStatusCode string `json:"backend_status_code"`
ReceivedBytes string `json:"received_bytes"`
SentBytes string `json:"sent_bytes"`
Request string `json:"request"`
}
func main() {
apexS3.HandleFunc(func(event *apexS3.Event, ctx *apex.Context) error {
// Iterate all the S3 events, while extracting S3 bucket and filename.
for _, rec := range event.Records {
// S3 session
svc := s3.New(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}))
log.Printf("key: %s\n", rec.S3.Object.Key)
// get the S3 object, i.e. file content
s3out, err := svc.GetObject(&s3.GetObjectInput{
Bucket: aws.String(rec.S3.Bucket.Name),
Key: aws.String(rec.S3.Object.Key),
})
if (err != nil) {
log.Fatal(err)
}
// working with Elasticsearch
transport := signer.NewTransport(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}), elasticsearchservice.ServiceName)
httpClient := &http.Client{
Transport: transport,
}
// Use the client with Olivere's elastic client
client, err := elastic.NewClient(
elastic.SetSniff(false),
elastic.SetURL(esUrl),
elastic.SetScheme("https"),
elastic.SetHttpClient(httpClient),
)
if err != nil {
panic(err)
}
// Create an index.
indexName := esIndex
//createIndex, err := client.CreateIndex(indexName).Do()
//if err != nil {
// panic(err)
//}
//if !createIndex.Acknowledged {
// panic(err)
//}
// Read the body of the S3 object.
bytes, err := ioutil.ReadAll(s3out.Body)
if (err != nil) {
log.Fatal(err)
}
for _, line := range strings.Split(string(bytes), "\n") {
if len(line) != 0 {
putDocumentIntoES(client, indexName, line)
}
}
}
return nil
})
}
// Index a recode of the S3 object, which is a single HTTP access log record.
func putDocumentIntoES(c *elastic.Client, indexName string, line string) error {
log.Printf("line: %s\n", line)
doc, err := arrayToElbAccessLog(strings.Split(line, " "))
if err != nil {
log.Fatal(err)
return nil
}
put1, err := c.Index().
Index(indexName).
Type(esType).
BodyJson(doc).
Do()
if err != nil {
panic(err)
}
log.Printf("Indexed elb access log %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type)
return nil
}
func arrayToElbAccessLog(line []string) (*ElbAccessLog, error) {
const ExpectedELBLog = `2016-09-15T23:51:09.703783Z elb1 59.x.x.x:53859 10.0.0.135:80 0.000036 0.001198 0.000018 200 200 0 18 "GET http://elb1-XXX.ap-northeast-1.elb.amazonaws.com:80/welcome?lastname=f HTTP/1.1" "mackerel-http-checker/0.0.1" - -`
if len(line) != len(strings.Split(ExpectedELBLog, " ")) {
return nil, fmt.Errorf("Error: %s", "Unexpected log format.")
}
elb := &ElbAccessLog{
Timestamp: line[0],
Elb: line[1],
ClientIpAddress: line[2],
BackendIpAddress: line[3],
RequestProcessingTime: line[4],
BackendProcessingTime: line[5],
ResponseProcessingTime: line[6],
ElbStatusCode: line[7],
BackendStatusCode: line[8],
ReceivedBytes: line[9],
SentBytes: line[10],
Request: strings.Join(line[11:13], " "),
}
return elb, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment