-
-
Save traetox/6cbed39c14945a79530e1dc5f376edfc to your computer and use it in GitHub Desktop.
Kegerator Gravwell Ingester
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"log" | |
"encoding/binary" | |
"net" | |
"time" | |
"github.com/gravwell/gravwell/v3/ingest" | |
"github.com/gravwell/gravwell/v3/ingest/entry" | |
"github.com/gravwell/gravwell/v3/ingesters/utils" | |
) | |
const ( | |
tempID uint16 = 0x12 | |
compressorID uint16 = 0x2013 | |
) | |
var ( | |
tags = []string{"compressor", "keg"} | |
targets = []string{"tcp://172.17.0.2:4023"} | |
secret = "IngestSecrets" // set this to the actual ingest secret | |
) | |
// Example demonstrates how to write a simple ingester, which generates | |
// and writes some entries to Gravwell | |
func main() { | |
// Configure the ingester | |
ingestConfig := ingest.UniformMuxerConfig{ | |
Destinations: targets, | |
Tags: tags, | |
Auth: secret, | |
PublicKey: ``, | |
PrivateKey: ``, | |
LogLevel: "WARN", | |
} | |
// Start the ingester | |
igst, err := ingest.NewUniformMuxer(ingestConfig) | |
if err != nil { | |
log.Fatalf("Failed build our ingest system: %v\n", err) | |
} | |
defer igst.Close() | |
if err := igst.Start(); err != nil { | |
log.Fatalf("Failed start our ingest system: %v\n", err) | |
} | |
log.Println("Connecting to Gravwell") | |
// Wait for connection to indexers | |
if err := igst.WaitForHot(0); err != nil { | |
log.Fatalf("Timedout waiting for backend connections: %v\n", err) | |
} | |
// Generate and send some entries | |
kegTag, err := igst.GetTag("keg") | |
if err != nil { | |
log.Fatalf("Failed to get keg tag: %v", err) | |
} | |
compTag, err := igst.GetTag("compressor") | |
if err != nil { | |
log.Fatalf("Failed to get compressor tag: %v", err) | |
} | |
// listen to incoming udp packets | |
pc, err := net.ListenPacket("udp", ":5005") | |
if err != nil { | |
log.Fatalf("Failed to listen on port 5005: %v\n", err) | |
} | |
errCh := make(chan error, 1) | |
go server(igst, pc, kegTag, compTag, errCh) | |
quitCh := utils.GetQuitChannel() | |
log.Println("Listening for messages") | |
select { | |
case err := <-errCh: //just break | |
log.Println("Got error from UDP listener", err) | |
case _ = <-quitCh: | |
//close the packet connection and wait for it to exit | |
pc.Close() | |
<-errCh | |
} | |
// Now shut down | |
if err := igst.Sync(time.Second); err != nil { | |
log.Printf("Failed to sync: %v", err) | |
} | |
igst.Close() | |
} | |
func server(igst *ingest.IngestMuxer, pc net.PacketConn, keg, comp entry.EntryTag, errCh chan error) { | |
defer pc.Close() | |
for { | |
buf := make([]byte, 2048) | |
n, addr, err := pc.ReadFrom(buf) | |
if err != nil { | |
log.Println("Bad receive", err) | |
errCh <- err | |
break | |
} | |
if n < 2 { | |
log.Println("got tiny broken packet of length", n) | |
continue | |
} | |
e := &entry.Entry{ | |
TS: entry.Now(), | |
SRC: getAddr(addr), | |
Data: buf[:n], | |
} | |
//decode the leading 16 bit id | |
id := binary.LittleEndian.Uint16(buf[0:2]) | |
switch id { | |
case tempID: | |
//we have a temperature message | |
e.Tag = keg | |
case compressorID: | |
// we have a compressor message | |
e.Tag = comp | |
default: | |
log.Printf("Got unknown message ID: %x (%d bytes)\n", id, n) | |
continue | |
} | |
if err := igst.WriteEntry(e); err != nil { | |
errCh <- err | |
log.Printf("Failed to write entry: %v", err) | |
break | |
} | |
} | |
return | |
} | |
func getAddr(addr net.Addr) net.IP { | |
if ip, _, err := net.SplitHostPort(addr.String()); err == nil { | |
return net.ParseIP(ip) | |
} | |
return nil //indexer will fill this in | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment