Skip to content

Instantly share code, notes, and snippets.

@traetox
Created November 4, 2020 15:56
Show Gist options
  • Save traetox/6cbed39c14945a79530e1dc5f376edfc to your computer and use it in GitHub Desktop.
Save traetox/6cbed39c14945a79530e1dc5f376edfc to your computer and use it in GitHub Desktop.
Kegerator Gravwell Ingester
package main
import (
"log"
"encoding/binary"
"net"
"time"
"github.com/gravwell/gravwell/v3/ingest"
"github.com/gravwell/gravwell/v3/ingest/entry"
"github.com/gravwell/gravwell/v3/ingesters/utils"
)
const (
tempID uint16 = 0x12
compressorID uint16 = 0x2013
)
var (
tags = []string{"compressor", "keg"}
targets = []string{"tcp://172.17.0.2:4023"}
secret = "IngestSecrets" // set this to the actual ingest secret
)
// Example demonstrates how to write a simple ingester, which generates
// and writes some entries to Gravwell
func main() {
// Configure the ingester
ingestConfig := ingest.UniformMuxerConfig{
Destinations: targets,
Tags: tags,
Auth: secret,
PublicKey: ``,
PrivateKey: ``,
LogLevel: "WARN",
}
// Start the ingester
igst, err := ingest.NewUniformMuxer(ingestConfig)
if err != nil {
log.Fatalf("Failed build our ingest system: %v\n", err)
}
defer igst.Close()
if err := igst.Start(); err != nil {
log.Fatalf("Failed start our ingest system: %v\n", err)
}
log.Println("Connecting to Gravwell")
// Wait for connection to indexers
if err := igst.WaitForHot(0); err != nil {
log.Fatalf("Timedout waiting for backend connections: %v\n", err)
}
// Generate and send some entries
kegTag, err := igst.GetTag("keg")
if err != nil {
log.Fatalf("Failed to get keg tag: %v", err)
}
compTag, err := igst.GetTag("compressor")
if err != nil {
log.Fatalf("Failed to get compressor tag: %v", err)
}
// listen to incoming udp packets
pc, err := net.ListenPacket("udp", ":5005")
if err != nil {
log.Fatalf("Failed to listen on port 5005: %v\n", err)
}
errCh := make(chan error, 1)
go server(igst, pc, kegTag, compTag, errCh)
quitCh := utils.GetQuitChannel()
log.Println("Listening for messages")
select {
case err := <-errCh: //just break
log.Println("Got error from UDP listener", err)
case _ = <-quitCh:
//close the packet connection and wait for it to exit
pc.Close()
<-errCh
}
// Now shut down
if err := igst.Sync(time.Second); err != nil {
log.Printf("Failed to sync: %v", err)
}
igst.Close()
}
func server(igst *ingest.IngestMuxer, pc net.PacketConn, keg, comp entry.EntryTag, errCh chan error) {
defer pc.Close()
for {
buf := make([]byte, 2048)
n, addr, err := pc.ReadFrom(buf)
if err != nil {
log.Println("Bad receive", err)
errCh <- err
break
}
if n < 2 {
log.Println("got tiny broken packet of length", n)
continue
}
e := &entry.Entry{
TS: entry.Now(),
SRC: getAddr(addr),
Data: buf[:n],
}
//decode the leading 16 bit id
id := binary.LittleEndian.Uint16(buf[0:2])
switch id {
case tempID:
//we have a temperature message
e.Tag = keg
case compressorID:
// we have a compressor message
e.Tag = comp
default:
log.Printf("Got unknown message ID: %x (%d bytes)\n", id, n)
continue
}
if err := igst.WriteEntry(e); err != nil {
errCh <- err
log.Printf("Failed to write entry: %v", err)
break
}
}
return
}
func getAddr(addr net.Addr) net.IP {
if ip, _, err := net.SplitHostPort(addr.String()); err == nil {
return net.ParseIP(ip)
}
return nil //indexer will fill this in
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment