Skip to content

Instantly share code, notes, and snippets.

@ecc256
Created May 8, 2018 15:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ecc256/b5b2c0fd7fa10f7a99da8e8e7bd36cab to your computer and use it in GitHub Desktop.
Save ecc256/b5b2c0fd7fa10f7a99da8e8e7bd36cab to your computer and use it in GitHub Desktop.
testbeat
package beater
import (
"strconv"
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"testbeat/config"
)
var (
logX *logp.Logger
lastPublishedID int
isRegistryOpen bool
)
type test struct {
done chan struct{}
config config.Config
client beat.Client
}
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, err
}
bt := &test{
done: make(chan struct{}),
config: config,
}
return bt, nil
}
func (bt *test) ackLastEvent(data interface{}) {
if data != nil {
if lastAckID, ok := data.(int); ok {
if lastAckID == lastPublishedID {
logX.Infof("==> Last Published event IDs is ACKed: %d", lastAckID)
}
}
}
if !isRegistryOpen {
logX.Warn("==> Cannot update registry...")
}
}
func (bt *test) publishRow(id int) {
event := beat.Event{
Timestamp: time.Now(),
Meta: common.MapStr{"id": strconv.FormatInt(int64(id), 10)},
Fields: common.MapStr{"id": id},
Private: id,
}
bt.client.Publish(event)
lastPublishedID = id
}
func (bt *test) run() error {
for count, eventCount := 0, bt.config.EventCount; count < eventCount; count++ {
bt.publishRow(count + 100000000)
}
logX.Infof("==> Events published: %d", lastPublishedID)
//time.Sleep(bt.config.RunSleep)
return nil
}
func (bt *test) Run(b *beat.Beat) error {
logX = logp.NewLogger(b.Info.Beat)
isRegistryOpen = true
defer func() { isRegistryOpen = false }()
var err error
bt.client, err = b.Publisher.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend, // infinite retry
WaitClose: bt.config.WaitClose, // additional wait on shutdown
ACKLastEvent: bt.ackLastEvent,
})
if err != nil {
return err
}
defer bt.client.Close()
return bt.run()
}
func (bt *test) Stop() {
bt.client.Close()
close(bt.done)
}
package cmd
import (
"testbeat/beater"
cmd "github.com/elastic/beats/libbeat/cmd"
)
// Name of this beat
var Name = "test"
// RootCmd to handle beats cli
var RootCmd = cmd.GenRootCmd(Name, "", beater.New)
package config
import "time"
type Config struct {
EventCount int `config:"EventCount"`
WaitClose time.Duration `config:"WaitClose"`
RunSleep time.Duration `config:"RunSleep"`
}
var DefaultConfig = Config{
EventCount: 10,
WaitClose: 1 * time.Second,
RunSleep: 3 * time.Second,
}
package main
import (
"os"
"testbeat/cmd"
)
func main() {
if err := cmd.RootCmd.Execute(); err != nil {
os.Exit(1)
}
}
test:
#EventCount: 2
EventCount: 100
#EventCount: 1000000
WaitClose: 3s
RunSleep: 3s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment