Created
August 17, 2018 13:52
-
-
Save wheresalice/c96f604f64a3dd93a5967424e75844ac to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"github.com/Jeffail/benthos/lib/stream" | |
"github.com/Jeffail/benthos/lib/input" | |
"github.com/Jeffail/benthos/lib/output" | |
"github.com/Jeffail/benthos/lib/types" | |
"time" | |
"os" | |
"os/signal" | |
"syscall" | |
"log" | |
"bytes" | |
) | |
type TimeStamp struct{} | |
func (TimeStamp) ProcessMessage(m types.Message) ([]types.Message, types.Response) { | |
// Create a copy of the original message | |
result := m.Copy() | |
// For each message part replace its contents with the timestamp we received it at | |
result.Iter(func(i int, part types.Part) error { | |
var buf bytes.Buffer | |
buf.WriteString(m.CreatedAt().String()) | |
part.Set(buf.Bytes()) | |
return nil | |
}) | |
return []types.Message{result}, nil | |
} | |
func main() { | |
conf := stream.NewConfig() | |
conf.Input.Type = input.TypeKafka | |
conf.Input.Kafka.Addresses = []string{"localhost:9092"} | |
conf.Input.Kafka.Topic = "test.topic" | |
conf.Output.Type = output.TypeSTDOUT | |
s, err := stream.New(conf, stream.OptAddProcessors(func() (types.Processor, error) { | |
return TimeStamp{}, nil | |
})) | |
if err != nil { | |
panic(err) | |
} | |
defer s.Stop(time.Second) | |
sigChan := make(chan os.Signal, 1) | |
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) | |
select { | |
case <-sigChan: | |
log.Println("Received SIGTERM, the service is closing") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment