Skip to content

Instantly share code, notes, and snippets.

@wheresalice
Created August 17, 2018 13:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wheresalice/c96f604f64a3dd93a5967424e75844ac to your computer and use it in GitHub Desktop.
Save wheresalice/c96f604f64a3dd93a5967424e75844ac to your computer and use it in GitHub Desktop.
package main
import (
"github.com/Jeffail/benthos/lib/stream"
"github.com/Jeffail/benthos/lib/input"
"github.com/Jeffail/benthos/lib/output"
"github.com/Jeffail/benthos/lib/types"
"time"
"os"
"os/signal"
"syscall"
"log"
"bytes"
)
type TimeStamp struct{}
func (TimeStamp) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
// Create a copy of the original message
result := m.Copy()
// For each message part replace its contents with the timestamp we received it at
result.Iter(func(i int, part types.Part) error {
var buf bytes.Buffer
buf.WriteString(m.CreatedAt().String())
part.Set(buf.Bytes())
return nil
})
return []types.Message{result}, nil
}
func main() {
conf := stream.NewConfig()
conf.Input.Type = input.TypeKafka
conf.Input.Kafka.Addresses = []string{"localhost:9092"}
conf.Input.Kafka.Topic = "test.topic"
conf.Output.Type = output.TypeSTDOUT
s, err := stream.New(conf, stream.OptAddProcessors(func() (types.Processor, error) {
return TimeStamp{}, nil
}))
if err != nil {
panic(err)
}
defer s.Stop(time.Second)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
select {
case <-sigChan:
log.Println("Received SIGTERM, the service is closing")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment