Skip to content

Instantly share code, notes, and snippets.

@matzew
Created July 13, 2018 09:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matzew/b98dbacf7f40f27c9f666b736a2428d3 to your computer and use it in GitHub Desktop.
Save matzew/b98dbacf7f40f27c9f666b736a2428d3 to your computer and use it in GitHub Desktop.
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.List;
import java.util.Properties;
public class ProducerBuilder<K, V> {
final Properties producerConfig = new Properties();
public ProducerBuilder bootstrapServers(final String bootstrapsServers) {
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapsServers);
return this;
}
public ProducerBuilder metadataMaxAgeMs(final long metadataMaxAgeMs) {
producerConfig.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAgeMs);
return this;
}
public ProducerBuilder batchSize(final int batchSize) {
producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
return this;
}
public ProducerBuilder acks(final String acks) {
producerConfig.put(ProducerConfig.ACKS_CONFIG, acks);
return this;
}
public ProducerBuilder lingerMs(final long lingerMs) {
producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
return this;
}
public ProducerBuilder clientId(final String clientId) {
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
return this;
}
public ProducerBuilder sendBufferBytes(final int sendBufferBytes) {
producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, sendBufferBytes);
return this;
}
public ProducerBuilder receiveBufferBytes(final int receiveBufferBytes) {
producerConfig.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);
return this;
}
public ProducerBuilder maxRequestSize(final int maxRequestSize) {
producerConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
return this;
}
public ProducerBuilder reconnectBackoffMs(final long reconnectBackoffMs) {
producerConfig.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs);
return this;
}
public ProducerBuilder reconnectBackoffMaxMs(final long reconnectBackoffMaxMs) {
producerConfig.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMaxMs);
return this;
}
public ProducerBuilder maxBlockMs(final long maxBlockMs) {
producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs);
return this;
}
public ProducerBuilder bufferMemory(final long bufferMemory) {
producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
return this;
}
public ProducerBuilder retryBackoffMs(final long retryBackoffMs) {
producerConfig.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs);
return this;
}
public ProducerBuilder compressionType(final String compressionType) {
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
return this;
}
public ProducerBuilder metricsSampleWindowMS(final long metricsSampleWindowMS) {
producerConfig.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, metricsSampleWindowMS);
return this;
}
public ProducerBuilder metricsNumSamples(final int metricsNumSamples) {
producerConfig.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, metricsNumSamples);
return this;
}
public ProducerBuilder metricsRecordingLevel(final String metricsRecordingLevel) {
producerConfig.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, metricsRecordingLevel);
return this;
}
public ProducerBuilder metricReporters(final List<MetricsReporter> metricReporters) {
producerConfig.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, metricReporters);
return this;
}
public ProducerBuilder maxInFlightRequestsPerConnection(final int maxInFlightRequestsPerConnection) {
producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection);
return this;
}
public ProducerBuilder retries(final int retries) {
producerConfig.put(ProducerConfig.RETRIES_CONFIG, retries);
return this;
}
public ProducerBuilder keySerializer(final Class<? extends Serializer<K>> keySerializer) {
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
return this;
}
public ProducerBuilder valueSerializer(final Class<? extends Serializer<V>> valueSerializer) {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return this;
}
public ProducerBuilder connectionsMaxIdleMs(final long connectionsMaxIdleMs) {
producerConfig.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionsMaxIdleMs);
return this;
}
public ProducerBuilder partitionerClass(final Class<? extends Partitioner> partitionerClass) {
producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionerClass);
return this;
}
public ProducerBuilder requestTimeoutMs(final int requestTimeoutMs) {
producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
return this;
}
public ProducerBuilder interceptorClasses(final List<ProducerInterceptor<K, V>> interceptorClasses) {
producerConfig.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorClasses);
return this;
}
public ProducerBuilder enableIdempotence(final boolean enableIdempotence) {
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence);
return this;
}
public ProducerBuilder transactionTimeoutMs(final int transactionTimeoutMs) {
producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
return this;
}
public ProducerBuilder transactionalId(final String transactionalId) {
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
return this;
}
public Producer<K, V> build() {
return new KafkaProducer(producerConfig);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment