Created
September 17, 2016 10:56
-
-
Save tfutada/99124a5c0d8ae178e808ac03b455e516 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"strings" | |
"fmt" | |
"log" | |
"net/http" | |
"io/ioutil" | |
"github.com/aws/aws-sdk-go/aws/session" | |
"github.com/aws/aws-sdk-go/aws" | |
"github.com/aws/aws-sdk-go/service/s3" | |
"github.com/aws/aws-sdk-go/service/elasticsearchservice" | |
apex "github.com/apex/go-apex" | |
apexS3 "github.com/apex/go-apex/s3" | |
"gopkg.in/olivere/elastic.v3" | |
"github.com/edoardo849/apex-aws-signer" | |
) | |
// Here is the basic concept of this code. | |
// 1. receives a S3 event, which contains a S3 bucket name and a file name, that is an access log. | |
// 2. loops records in the S3 event, while | |
// 3. getting a S3 object with a given file name. | |
// 4. nested-loops the obtained file, while | |
// 5. parsing lines, each of which represents a single ELB access log. | |
// 6. putting it into Elasticsearch as a Document. | |
const ( | |
esUrl = "https://XXX.ap-northeast-1.es.amazonaws.com" | |
esIndex = "elblogidx" | |
esType = "elb" | |
) | |
type ElbAccessLog struct { | |
Timestamp string `json:"timestamp"` | |
Elb string `json:"elb"` | |
ClientIpAddress string `json:"client_ip_address"` | |
BackendIpAddress string `json:"backend_ip_address"` | |
RequestProcessingTime string `json:"request_processing_time"` | |
BackendProcessingTime string `json:"backend_processing_time"` | |
ResponseProcessingTime string `json:"response_processing_time"` | |
ElbStatusCode string `json:"elb_status_code"` | |
BackendStatusCode string `json:"backend_status_code"` | |
ReceivedBytes string `json:"received_bytes"` | |
SentBytes string `json:"sent_bytes"` | |
Request string `json:"request"` | |
} | |
func main() { | |
apexS3.HandleFunc(func(event *apexS3.Event, ctx *apex.Context) error { | |
// Iterate all the S3 events, while extracting S3 bucket and filename. | |
for _, rec := range event.Records { | |
// S3 session | |
svc := s3.New(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)})) | |
log.Printf("key: %s\n", rec.S3.Object.Key) | |
// get the S3 object, i.e. file content | |
s3out, err := svc.GetObject(&s3.GetObjectInput{ | |
Bucket: aws.String(rec.S3.Bucket.Name), | |
Key: aws.String(rec.S3.Object.Key), | |
}) | |
if (err != nil) { | |
log.Fatal(err) | |
} | |
// working with Elasticsearch | |
transport := signer.NewTransport(session.New(&aws.Config{Region: aws.String(rec.AWSRegion)}), elasticsearchservice.ServiceName) | |
httpClient := &http.Client{ | |
Transport: transport, | |
} | |
// Use the client with Olivere's elastic client | |
client, err := elastic.NewClient( | |
elastic.SetSniff(false), | |
elastic.SetURL(esUrl), | |
elastic.SetScheme("https"), | |
elastic.SetHttpClient(httpClient), | |
) | |
if err != nil { | |
panic(err) | |
} | |
// Create an index. | |
indexName := esIndex | |
//createIndex, err := client.CreateIndex(indexName).Do() | |
//if err != nil { | |
// panic(err) | |
//} | |
//if !createIndex.Acknowledged { | |
// panic(err) | |
//} | |
// Read the body of the S3 object. | |
bytes, err := ioutil.ReadAll(s3out.Body) | |
if (err != nil) { | |
log.Fatal(err) | |
} | |
for _, line := range strings.Split(string(bytes), "\n") { | |
if len(line) != 0 { | |
putDocumentIntoES(client, indexName, line) | |
} | |
} | |
} | |
return nil | |
}) | |
} | |
// Index a recode of the S3 object, which is a single HTTP access log record. | |
func putDocumentIntoES(c *elastic.Client, indexName string, line string) error { | |
log.Printf("line: %s\n", line) | |
doc, err := arrayToElbAccessLog(strings.Split(line, " ")) | |
if err != nil { | |
log.Fatal(err) | |
return nil | |
} | |
put1, err := c.Index(). | |
Index(indexName). | |
Type(esType). | |
BodyJson(doc). | |
Do() | |
if err != nil { | |
panic(err) | |
} | |
log.Printf("Indexed elb access log %s to index %s, type %s\n", put1.Id, put1.Index, put1.Type) | |
return nil | |
} | |
func arrayToElbAccessLog(line []string) (*ElbAccessLog, error) { | |
const ExpectedELBLog = `2016-09-15T23:51:09.703783Z elb1 59.x.x.x:53859 10.0.0.135:80 0.000036 0.001198 0.000018 200 200 0 18 "GET http://elb1-XXX.ap-northeast-1.elb.amazonaws.com:80/welcome?lastname=f HTTP/1.1" "mackerel-http-checker/0.0.1" - -` | |
if len(line) != len(strings.Split(ExpectedELBLog, " ")) { | |
return nil, fmt.Errorf("Error: %s", "Unexpected log format.") | |
} | |
elb := &ElbAccessLog{ | |
Timestamp: line[0], | |
Elb: line[1], | |
ClientIpAddress: line[2], | |
BackendIpAddress: line[3], | |
RequestProcessingTime: line[4], | |
BackendProcessingTime: line[5], | |
ResponseProcessingTime: line[6], | |
ElbStatusCode: line[7], | |
BackendStatusCode: line[8], | |
ReceivedBytes: line[9], | |
SentBytes: line[10], | |
Request: strings.Join(line[11:13], " "), | |
} | |
return elb, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment