The storm-kafka
configs were taken from
SpoutConfig and
KafkaConfig.
storm-kafka-client
spout configurations were taken from
KafkaSpoutConfig.
Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help |
---|---|---|
Setting: startOffsetTime Default: EarliestTime ________________________________________________ forceFromStart Default: false startOffsetTime & forceFromStart together determine where consumer offsets are being read from, ZooKeeper, beginning, or end of Kafka stream |
Setting: FirstPollOffsetStrategy Default: UNCOMMITTED_EARLIEST Refer to this gist for choosing the right FirstPollOffsetStrategy based on your startOffsetTime & forceFromStart settings |
Import package: org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.<strategy-name> Usage: <KafkaSpoutConfig-Builder>.setFirstPollOffsetStrategy(<strategy-name>) |
Setting: scheme The interface that specifies how a ByteBuffer from a Kafka topic is transformed into Storm tuple Default: RawMultiScheme |
Deserializers |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <deserializer-class>) <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, <deserializer-class>) |
Setting: fetchSizeBytes Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server Default: 1MB |
Kafka config: max.partition.fetch.bytes Default: 1MB |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, <int-value>) |
Setting: bufferSizeBytes Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer (may be similar to SocketBufferSize) Default: 1MB |
Kafka config: receive.buffer.bytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, <int-value>) |
Setting: socketTimeoutMs Default: 10000 |
Kafka config: socket.timeout.ms Default: 30000 |
Import package: import kafka.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.SocketTimeout, <int-value>) |
Setting: useStartOffsetTimeIfOffsetOutOfRange Default: true |
Kafka config: auto.offset.reset Possible values: "latest" , "earliest" , "none" Default: latest . Exception: earliest if ProcessingGuarantee is set to AT_LEAST_ONCE |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, <String>) |
Setting: fetchMaxWait Maximum time in ms to wait for the response Default: 10000 |
Kafka config: fetch.max.wait.ms Default: 500 |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, <value>) |
Setting: minFetchByte |
Kafka config: fetch.min.bytes Default: 1 |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, <value>) |
Setting: maxOffsetBehind Specifies how long a spout attempts to retry the processing of a failed tuple. One of the scenarios is when a failing tuple's offset is more than maxOffsetBehind behind the acked offset, the spout stops retrying the tuple.Default: LONG.MAX_VALUE |
No equivalent in storm-kafka-client as of now |
|
Setting: clientId |
Kafka config: client.id |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.CLIENT_ID_CONFIG, <String>) |
Important document with config explanations:
Thanks, I think this is nearly there. The maxOffsetBehind section says that "If a failing tuple's offset is less than maxOffsetBehind, the spout stops retrying the tuple.". Shouldn't it be more than? i.e. if the latest offset is 100, and you set maxOffsetBehind to 50, and then offset 30 fails, 30 is more than maxOffsetBehind behind the latest offset, so it is not retried.
Regarding the links, I think we should try to use links that automatically point at the right release. There's some documentation about it here https://github.com/apache/storm-site#how-release-specific-docs-work, and example usage "The allowed values are listed in the FirstPollOffsetStrategy javadocs" (from https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md). It would be great if you fix any broken links you find, or any links that are hard coded to point at a specific release.