Skip to content

Instantly share code, notes, and snippets.

@TonPC64
Created February 24, 2022 14:51
Show Gist options
  • Save TonPC64/8ad671c8aa7e7fe0224f78e2503c6e98 to your computer and use it in GitHub Desktop.
Save TonPC64/8ad671c8aa7e7fe0224f78e2503c6e98 to your computer and use it in GitHub Desktop.
func printMessage(msg *sarama.ConsumerMessage) {
// Extract tracing info from message
ctx := otel.GetTextMapPropagator().Extract(context.Background(), otelsarama.NewConsumerMessageCarrier(msg))
tr := otel.GetTracerProvider().Tracer("consumer")
_, span := tr.Start(ctx, "consume message", trace.WithAttributes(
semconv.MessagingOperationProcess,
))
defer span.End()
log.Println("Successful to read message: ", string(msg.Value))
}
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
printMessage(message)
session.MarkMessage(message, "")
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment