Skip to content

Instantly share code, notes, and snippets.

@apurvam
Last active November 30, 2016 00:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apurvam/9e78f9f520669bc69459b6475a9cacfc to your computer and use it in GitHub Desktop.
Save apurvam/9e78f9f520669bc69459b6475a9cacfc to your computer and use it in GitHub Desktop.
Extending the Kafka Producer to support transactions.
public interface Producer<K,V> extends Closeable {
/**
* Needs to be called before any of the other transaction methods. Assumes that
* the producer.app.id is specified in the producer configuration.
*
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer
* are committed or rolled back.
* 2. Gets the internal producer id and epoch, used in all future transactional
* messages issued by the producer.
*
* @throws IllegalStateException if the appId for the producer is not set
* in the configuration.
*/
void initTransactions() throws IllegalStateException;
/**
* Should be called before the start of each new transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transaction.app.id is active.
*/
void beginTransaction() throws ProducerFencedException;
/**
* Sends a list of consumed offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* consumed only if the transaction is committed successfully.
*
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern.
*
* @throws ProducerFencedException if another producer is with the same
* transaction.app.id is active.
*/
void sendOffsets(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
/**
* Commits the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transaction.app.id is active.
*/
void commitTransaction() throws ProducerFencedException;
/**
* Aborts the ongoing transaction.
*
* @throws ProducerFencedException if another producer is with the same
* transaction.app.id is active.
*/
void abortTransaction() throws ProducerFencedException;
/**
* Send the given record asynchronously and return a future which will eventually contain the response information.
*
* @param record The record to send
* @return A future which will eventually contain the response information
*
* @throws UnrecognizedMessageException if the broker detects data loss: ie. previous messages which we expect
* to be committed are detected to be missing. This is a fatal error.
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record) throws UnrecognizedMessageException;
/**
* Send a record and invoke the given callback when the record has been acknowledged by the server
*
* @throws UnrecognizedMessageException if the broker detects data loss: ie. previous messages which we expect
* to be committed are detected to be missing. This is a fatal error.
*/
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) throws UnrecognizedMessageException;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment