Last active
February 25, 2018 22:47
-
-
Save korc/f6756681a11728dc7ed85df07af83039 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"database/sql" | |
"flag" | |
"fmt" | |
"log" | |
"math/big" | |
"net" | |
"time" | |
"github.com/lib/pq" | |
) | |
type AuditMessage struct { | |
Node string | |
Type string | |
Timestamp time.Time | |
MessageId int | |
Message string | |
} | |
func (m *AuditMessage) Scan(s fmt.ScanState, verb rune) error { | |
tok, err := s.Token(true, nil) | |
if err != nil { | |
return err | |
} | |
_, err = fmt.Sscanf(string(tok), "node=%s", &m.Node) | |
if err != nil { | |
return err | |
} | |
tok, err = s.Token(true, nil) | |
if err != nil { | |
return err | |
} | |
_, err = fmt.Sscanf(string(tok), "type=%s", &m.Type) | |
if err != nil { | |
return err | |
} | |
tok, err = s.Token(true, nil) | |
if err != nil { | |
return err | |
} | |
tm := new(big.Float) | |
_, err = fmt.Sscanf(string(tok), "msg=audit(%v:%d):", tm, &m.MessageId) | |
if err != nil { | |
return err | |
} | |
nanoSecs, _ := new(big.Float).Mul(tm, new(big.Float).SetInt64(10e8)).Int64() | |
m.Timestamp = time.Unix(0, nanoSecs) | |
tok, err = s.Token(true, func(r rune) bool { | |
return r != '\n' | |
}) | |
if err != nil { | |
return err | |
} | |
m.Message = string(tok) | |
return nil | |
} | |
func main() { | |
auditSocket := flag.String("audit", "/var/run/audispd_events", "Audit socket") | |
dsn := flag.String("dsn", "sslmode=disable", "Postgresql DSN") | |
dbTable := flag.String("table", "log", "Table to insert into") | |
tableCreateSql := flag.String("createSql", "create table \"%s\" (id bigserial primary key, node text, stamp timestamptz, msg_id bigint, msg_type text, msg text)", "Table creation SQL") | |
flag.Parse() | |
conn, err := net.Dial("unix", *auditSocket) | |
if err != nil { | |
log.Fatalf("Cannot open audit socket %#v: %s\n", *auditSocket, err) | |
} | |
defer conn.Close() | |
db, err := sql.Open("postgres", *dsn) | |
if err != nil { | |
log.Fatal("Cannot open database: ", err) | |
} | |
defer db.Close() | |
_, err = db.Exec(fmt.Sprintf("SELECT 1 FROM \"%s\" WHERE 1=0", *dbTable)) | |
if err != nil { | |
if err.(*pq.Error).Code == "42P01" { | |
_, err := db.Exec(fmt.Sprintf(*tableCreateSql, *dbTable)) | |
if err != nil { | |
log.Fatalf("Creating table %#v failed: %s\n", *dbTable, err) | |
} | |
log.Printf("Created table %#v in database", *dbTable) | |
} else { | |
log.Fatalf("Could not access table %#v: %s\n", *dbTable, err) | |
} | |
} | |
insertTmpl := "INSERT INTO \"%s\" (node, stamp, msg_id, msg_type, msg) VALUES ($1, $2, $3, $4, $5) RETURNING id" | |
sqlInsert, err := db.Prepare(fmt.Sprintf(insertTmpl, *dbTable)) | |
if err != nil { | |
log.Fatal("Cannot prepare query: ", err) | |
} | |
defer sqlInsert.Close() | |
scanner := bufio.NewScanner(bufio.NewReader(conn)) | |
for scanner.Scan() { | |
line := scanner.Text() | |
entry := &AuditMessage{} | |
fmt.Sscan(line, entry) | |
_, err := sqlInsert.Exec(entry.Node, entry.Timestamp, entry.MessageId, entry.Type, entry.Message) | |
if err != nil { | |
log.Print("Error inserting data: ", err) | |
log.Printf("Line: %#v\n", line) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment