Skip to content

Instantly share code, notes, and snippets.

@sj14
Last active November 20, 2023 13:08
Show Gist options
  • Save sj14/9890648434edfe18d06aa00666d48377 to your computer and use it in GitHub Desktop.
Save sj14/9890648434edfe18d06aa00666d48377 to your computer and use it in GitHub Desktop.
confluent-kafka-go sync produce
func syncProduce(ctx context.Context, producer *kafka.Producer, topic string, key, data []byte) error {
if producer == nil {
return errors.New("nil producer")
}
if producer.IsClosed() {
return errors.New("closed producer")
}
deliveryChan := make(chan kafka.Event, 1)
// do not close the delivery channel, as messages still could be sent there, which would panic
err := producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: key,
Value: data,
}, deliveryChan)
if err != nil {
return err
}
select {
case delivery := <-deliveryChan:
switch ev := delivery.(type) {
case *kafka.Message:
return ev.TopicPartition.Error
case kafka.Error:
return ev
default:
return errors.New("unknown delivery event")
}
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second):
return errors.New("produce timeout")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment