Skip to content

Instantly share code, notes, and snippets.

@korc
Last active February 25, 2018 22:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save korc/f6756681a11728dc7ed85df07af83039 to your computer and use it in GitHub Desktop.
Save korc/f6756681a11728dc7ed85df07af83039 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"database/sql"
"flag"
"fmt"
"log"
"math/big"
"net"
"time"
"github.com/lib/pq"
)
type AuditMessage struct {
Node string
Type string
Timestamp time.Time
MessageId int
Message string
}
func (m *AuditMessage) Scan(s fmt.ScanState, verb rune) error {
tok, err := s.Token(true, nil)
if err != nil {
return err
}
_, err = fmt.Sscanf(string(tok), "node=%s", &m.Node)
if err != nil {
return err
}
tok, err = s.Token(true, nil)
if err != nil {
return err
}
_, err = fmt.Sscanf(string(tok), "type=%s", &m.Type)
if err != nil {
return err
}
tok, err = s.Token(true, nil)
if err != nil {
return err
}
tm := new(big.Float)
_, err = fmt.Sscanf(string(tok), "msg=audit(%v:%d):", tm, &m.MessageId)
if err != nil {
return err
}
nanoSecs, _ := new(big.Float).Mul(tm, new(big.Float).SetInt64(10e8)).Int64()
m.Timestamp = time.Unix(0, nanoSecs)
tok, err = s.Token(true, func(r rune) bool {
return r != '\n'
})
if err != nil {
return err
}
m.Message = string(tok)
return nil
}
func main() {
auditSocket := flag.String("audit", "/var/run/audispd_events", "Audit socket")
dsn := flag.String("dsn", "sslmode=disable", "Postgresql DSN")
dbTable := flag.String("table", "log", "Table to insert into")
tableCreateSql := flag.String("createSql", "create table \"%s\" (id bigserial primary key, node text, stamp timestamptz, msg_id bigint, msg_type text, msg text)", "Table creation SQL")
flag.Parse()
conn, err := net.Dial("unix", *auditSocket)
if err != nil {
log.Fatalf("Cannot open audit socket %#v: %s\n", *auditSocket, err)
}
defer conn.Close()
db, err := sql.Open("postgres", *dsn)
if err != nil {
log.Fatal("Cannot open database: ", err)
}
defer db.Close()
_, err = db.Exec(fmt.Sprintf("SELECT 1 FROM \"%s\" WHERE 1=0", *dbTable))
if err != nil {
if err.(*pq.Error).Code == "42P01" {
_, err := db.Exec(fmt.Sprintf(*tableCreateSql, *dbTable))
if err != nil {
log.Fatalf("Creating table %#v failed: %s\n", *dbTable, err)
}
log.Printf("Created table %#v in database", *dbTable)
} else {
log.Fatalf("Could not access table %#v: %s\n", *dbTable, err)
}
}
insertTmpl := "INSERT INTO \"%s\" (node, stamp, msg_id, msg_type, msg) VALUES ($1, $2, $3, $4, $5) RETURNING id"
sqlInsert, err := db.Prepare(fmt.Sprintf(insertTmpl, *dbTable))
if err != nil {
log.Fatal("Cannot prepare query: ", err)
}
defer sqlInsert.Close()
scanner := bufio.NewScanner(bufio.NewReader(conn))
for scanner.Scan() {
line := scanner.Text()
entry := &AuditMessage{}
fmt.Sscan(line, entry)
_, err := sqlInsert.Exec(entry.Node, entry.Timestamp, entry.MessageId, entry.Type, entry.Message)
if err != nil {
log.Print("Error inserting data: ", err)
log.Printf("Line: %#v\n", line)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment