Created
July 13, 2018 09:16
-
-
Save matzew/b98dbacf7f40f27c9f666b736a2428d3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.kafka.clients.producer; | |
import org.apache.kafka.clients.producer.internals.DefaultPartitioner; | |
import org.apache.kafka.common.metrics.MetricsReporter; | |
import org.apache.kafka.common.serialization.IntegerSerializer; | |
import org.apache.kafka.common.serialization.Serializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import java.util.List; | |
import java.util.Properties; | |
public class ProducerBuilder<K, V> { | |
final Properties producerConfig = new Properties(); | |
public ProducerBuilder bootstrapServers(final String bootstrapsServers) { | |
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapsServers); | |
return this; | |
} | |
public ProducerBuilder metadataMaxAgeMs(final long metadataMaxAgeMs) { | |
producerConfig.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, metadataMaxAgeMs); | |
return this; | |
} | |
public ProducerBuilder batchSize(final int batchSize) { | |
producerConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); | |
return this; | |
} | |
public ProducerBuilder acks(final String acks) { | |
producerConfig.put(ProducerConfig.ACKS_CONFIG, acks); | |
return this; | |
} | |
public ProducerBuilder lingerMs(final long lingerMs) { | |
producerConfig.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs); | |
return this; | |
} | |
public ProducerBuilder clientId(final String clientId) { | |
producerConfig.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); | |
return this; | |
} | |
public ProducerBuilder sendBufferBytes(final int sendBufferBytes) { | |
producerConfig.put(ProducerConfig.SEND_BUFFER_CONFIG, sendBufferBytes); | |
return this; | |
} | |
public ProducerBuilder receiveBufferBytes(final int receiveBufferBytes) { | |
producerConfig.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes); | |
return this; | |
} | |
public ProducerBuilder maxRequestSize(final int maxRequestSize) { | |
producerConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize); | |
return this; | |
} | |
public ProducerBuilder reconnectBackoffMs(final long reconnectBackoffMs) { | |
producerConfig.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, reconnectBackoffMs); | |
return this; | |
} | |
public ProducerBuilder reconnectBackoffMaxMs(final long reconnectBackoffMaxMs) { | |
producerConfig.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, reconnectBackoffMaxMs); | |
return this; | |
} | |
public ProducerBuilder maxBlockMs(final long maxBlockMs) { | |
producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs); | |
return this; | |
} | |
public ProducerBuilder bufferMemory(final long bufferMemory) { | |
producerConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); | |
return this; | |
} | |
public ProducerBuilder retryBackoffMs(final long retryBackoffMs) { | |
producerConfig.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMs); | |
return this; | |
} | |
public ProducerBuilder compressionType(final String compressionType) { | |
producerConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); | |
return this; | |
} | |
public ProducerBuilder metricsSampleWindowMS(final long metricsSampleWindowMS) { | |
producerConfig.put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, metricsSampleWindowMS); | |
return this; | |
} | |
public ProducerBuilder metricsNumSamples(final int metricsNumSamples) { | |
producerConfig.put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, metricsNumSamples); | |
return this; | |
} | |
public ProducerBuilder metricsRecordingLevel(final String metricsRecordingLevel) { | |
producerConfig.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, metricsRecordingLevel); | |
return this; | |
} | |
public ProducerBuilder metricReporters(final List<MetricsReporter> metricReporters) { | |
producerConfig.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, metricReporters); | |
return this; | |
} | |
public ProducerBuilder maxInFlightRequestsPerConnection(final int maxInFlightRequestsPerConnection) { | |
producerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlightRequestsPerConnection); | |
return this; | |
} | |
public ProducerBuilder retries(final int retries) { | |
producerConfig.put(ProducerConfig.RETRIES_CONFIG, retries); | |
return this; | |
} | |
public ProducerBuilder keySerializer(final Class<? extends Serializer<K>> keySerializer) { | |
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer); | |
return this; | |
} | |
public ProducerBuilder valueSerializer(final Class<? extends Serializer<V>> valueSerializer) { | |
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer); | |
return this; | |
} | |
public ProducerBuilder connectionsMaxIdleMs(final long connectionsMaxIdleMs) { | |
producerConfig.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionsMaxIdleMs); | |
return this; | |
} | |
public ProducerBuilder partitionerClass(final Class<? extends Partitioner> partitionerClass) { | |
producerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, partitionerClass); | |
return this; | |
} | |
public ProducerBuilder requestTimeoutMs(final int requestTimeoutMs) { | |
producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs); | |
return this; | |
} | |
public ProducerBuilder interceptorClasses(final List<ProducerInterceptor<K, V>> interceptorClasses) { | |
producerConfig.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptorClasses); | |
return this; | |
} | |
public ProducerBuilder enableIdempotence(final boolean enableIdempotence) { | |
producerConfig.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotence); | |
return this; | |
} | |
public ProducerBuilder transactionTimeoutMs(final int transactionTimeoutMs) { | |
producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); | |
return this; | |
} | |
public ProducerBuilder transactionalId(final String transactionalId) { | |
producerConfig.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); | |
return this; | |
} | |
public Producer<K, V> build() { | |
return new KafkaProducer(producerConfig); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment