The storm-kafka
configs were taken from
SpoutConfig and
KafkaConfig.
storm-kafka-client
spout configurations were taken from
KafkaSpoutConfig.
Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help |
---|---|---|
Setting: startOffsetTime Default: EarliestTime ________________________________________________ forceFromStart Default: false startOffsetTime & forceFromStart together determine where consumer offsets are being read from, ZooKeeper, beginning, or end of Kafka stream |
Setting: FirstPollOffsetStrategy Default: UNCOMMITTED_EARLIEST Refer to this gist for choosing the right FirstPollOffsetStrategy based on your startOffsetTime & forceFromStart settings |
Import package: org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.<strategy-name> Usage: <KafkaSpoutConfig-Builder>.setFirstPollOffsetStrategy(<strategy-name>) |
Setting: scheme The interface that specifies how a ByteBuffer from a Kafka topic is transformed into Storm tuple Default: RawMultiScheme |
Deserializers |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <deserializer-class>) <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, <deserializer-class>) |
Setting: fetchSizeBytes Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server Default: 1MB |
Kafka config: max.partition.fetch.bytes Default: 1MB |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, <int-value>) |
Setting: bufferSizeBytes Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer (may be similar to SocketBufferSize) Default: 1MB |
Kafka config: receive.buffer.bytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, <int-value>) |
Setting: socketTimeoutMs Default: 10000 |
Kafka config: socket.timeout.ms Default: 30000 |
Import package: import kafka.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.SocketTimeout, <int-value>) |
Setting: useStartOffsetTimeIfOffsetOutOfRange Default: true |
Kafka config: auto.offset.reset Possible values: "latest" , "earliest" , "none" Default: latest . Exception: earliest if ProcessingGuarantee is set to AT_LEAST_ONCE |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, <String>) |
Setting: fetchMaxWait Maximum time in ms to wait for the response Default: 10000 |
Kafka config: fetch.max.wait.ms Default: 500 |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, <value>) |
Setting: minFetchByte |
Kafka config: fetch.min.bytes Default: 1 |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, <value>) |
Setting: maxOffsetBehind Specifies how long a spout attempts to retry the processing of a failed tuple. One of the scenarios is when a failing tuple's offset is more than maxOffsetBehind behind the acked offset, the spout stops retrying the tuple.Default: LONG.MAX_VALUE |
No equivalent in storm-kafka-client as of now |
|
Setting: clientId |
Kafka config: client.id |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.CLIENT_ID_CONFIG, <String>) |
Important document with config explanations:
Thanks a lot Stig for the comments, they are very useful and informative.
scheme: Addressed. I missed the
KeyValueScheme
and hence didn't addKEY DESERIALIZER
, thanks for noticing that.fetchSizeBytes: Addressed. The third link (with algorithm) was useful in understanding why
fetchSizeBytes
is analogous tomax.partition.fetch.bytes
and notfetch.max.bytes
. Another thing to note is that the default forfetchSizeBytes
is the same asmax.partition.fetch.bytes
(reassures your theory).auto.offset.reset: I have modified the default to have an exception depending on the
ProcessingGuarantee
configuration variable. Can you check if this looks good now?fetchMaxWait: Thanks, have added the documentation in the table.
maxOffsetBehind: Thanks for the elaborate explanation. I understand this configuration much better now.
I am creating a Pull Request to add this table in
storm-kafka-client
documentation but before I do that I have another question about the links in this gist. Currently all the links reference to the Storm-1.0.6 release version. Is it ok if I modify these links to point to the latest release versionv1.2.1
(the current links instorm-kafka-client
document are broken too)?