Skip to content

Instantly share code, notes, and snippets.

@tkuchiki
Last active May 12, 2016 06:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkuchiki/e7d95f262b089cc98311 to your computer and use it in GitHub Desktop.
Save tkuchiki/e7d95f262b089cc98311 to your computer and use it in GitHub Desktop.
fluentd のログを Redshift に COPY できるようにする(JSON)
package main
import (
"bufio"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"regexp"
"time"
)
const (
ISO8601 = "2006-1-2T15:4:5"
TIMESTAMP = "2006-01-02 15:04:05"
)
func parseLine(line string) []string {
rep := regexp.MustCompile(`\s+`)
return rep.Split(line, 3)
}
func encodeJson(jsonText string) (map[string]interface{}, error) {
var data map[string]interface{}
err := json.Unmarshal([]byte(jsonText), &data)
return data, err
}
func decodeJson(v interface{}) (string, error) {
buf, err := json.Marshal(v)
return string(buf), err
}
func parseTime(value string) (time.Time, error) {
if value == "" {
var tmpT time.Time
return tmpT, nil
}
return time.ParseInLocation(ISO8601, value, time.Local)
}
func unix2Time(epoch int64) time.Time {
return time.Unix(epoch, 0)
}
func isInRange(t int64, begin, end time.Time) bool {
if begin.IsZero() && end.IsZero() {
return true
}
if begin.IsZero() && !end.IsZero() {
return t <= end.Unix()
}
if !begin.IsZero() && end.IsZero() {
return t >= begin.Unix()
}
return t >= begin.Unix() && t <= end.Unix()
}
func main() {
tag := flag.String("tag", "", "tag")
timeColumn := flag.String("time", "time", "time column name")
begin := flag.String("begin-time", "", "begin time")
end := flag.String("end-time", "", "end time")
flag.Parse()
f := os.Stdin
scanner := bufio.NewScanner(f)
for scanner.Scan() {
l := scanner.Text()
data := parseLine(l)
if *tag != "" && *tag != data[1] {
continue
}
var err error
var j map[string]interface{}
j, err = encodeJson(data[2])
if err != nil {
log.Println(err)
}
var beginT, endT time.Time
beginT, err = parseTime(*begin)
if err != nil {
log.Println(err)
continue
}
endT, err = parseTime(*end)
if err != nil {
log.Println(err)
continue
}
// epoch ではなく timestamp 型にしたい場合は、変換する処理を書く
epoch := int64(j[*timeColumn].(float64))
// t, err = unix2Time(epoch)
// timestamp := t.Format(TIMESTAMP)
// 指定時間の範囲外だったら処理しない
if !isInRange(epoch, beginT, endT) {
continue
}
delete(j, *timeColumn)
j[*timeColumn] = epoch
//j[*timeColumn] = timestamp
var jsonText string
jsonText, err = decodeJson(j)
if err != nil {
log.Println(err)
continue
}
fmt.Println(jsonText)
}
}
package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"regexp"
"time"
)
const (
ISO8601 = "2006-1-2T15:4:5-07:00"
TIMESTAMP = "2006-01-02 15:04:05"
)
// ex: line = `2016-01-25T10:00:07+09:00 tag {"json": "data", "foo": "bar"}`
func parseLine(line string) []string {
rep := regexp.MustCompile(`\s+`)
return rep.Split(line, 3)
}
func addTime(j string, column string, t time.Time) string {
rep := regexp.MustCompile(`}$`)
return rep.ReplaceAllString(j, fmt.Sprintf(`,"%s": "%s"}`, column, t.Format(TIMESTAMP)))
}
func main() {
tag := flag.String("tag", "", "tag")
timeColumn := flag.String("time", "time", "time column name")
flag.Parse()
f := os.Stdin
scanner := bufio.NewScanner(f)
for scanner.Scan() {
l := scanner.Text()
data := parseLine(l)
t, err := time.Parse(ISO8601, data[0])
if err != nil {
log.Fatal(err)
}
if *tag == "" || *tag != data[1] {
continue
}
fmt.Println(addTime(data[2], *timeColumn, t))
}
}

build

go build -o fluentlog2json

CREATE TABLE

CREATE TABLE mytable (
  id INT,
  msg TEXT,
  time TIMESTAMP
)
distkey(id)
sortkey(time);

COPY 用の json 生成

$ cat fluentd.log
2016-01-29T00:00:00+09:00 TAG {"id":1,"msg":"foo"}
2016-01-29T01:00:00+09:00 TAG {"id":2, "msg":"bar"}
$ cat /path/to/fluentd.log | ./fluentlog2json -tag TAG > log.json
$ cat log.json
{"id":1, "msg":"foo","time":"2016-01-29 00:00:00"}
{"id":2, "msg":"bar","time":"2016-01-29 01:00:00"}

S3 にアップロード

$ aws s3 cp log.json s3://mybucket/

S3 から json を import(COPY)

psql で redshift につなぐ

psql -h REDSHIFT_ENDPOINT -U USERNAME -p PORT DBNAME

$ psql -h REDSHIFT_ENDPOINT -U testuser -p 5439 mydb
ユーザ testuser のパスワード:

COPY を実行

mydb=# COPY mytalbe FROM 's3://mybucket/log.json' WITH 
CREDENTIALS 'aws_access_key_id=<access-key-id>;aws_secret_access_key=<secret-access-key>' 
FORMAT AS JSON 'auto';
INFO:  Load into table 'mytable' completed, 2 record(s) loaded successfully.
COPY

mydb=# SELECT * FROM mytable;
  2 | bar | 2016-01-29 01:00:00
  1 | foo | 2016-01-29 00:00:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment