The storm-kafka
configs were taken from
SpoutConfig and
KafkaConfig.
storm-kafka-client
spout configurations were taken from
KafkaSpoutConfig.
Storm-0.9.6 SpoutConfig | Storm-1.0.6 KafkaSpoutConfig name | KafkaSpoutConfig usage help |
---|---|---|
Setting: startOffsetTime Default: EarliestTime ________________________________________________ forceFromStart Default: false startOffsetTime & forceFromStart together determine where consumer offsets are being read from, ZooKeeper, beginning, or end of Kafka stream |
Setting: FirstPollOffsetStrategy Default: UNCOMMITTED_EARLIEST Refer to this gist for choosing the right FirstPollOffsetStrategy based on your startOffsetTime & forceFromStart settings |
Import package: org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.<strategy-name> Usage: <KafkaSpoutConfig-Builder>.setFirstPollOffsetStrategy(<strategy-name>) |
Setting: scheme The interface that specifies how a ByteBuffer from a Kafka topic is transformed into Storm tuple Default: RawMultiScheme |
Deserializers |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <deserializer-class>) <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, <deserializer-class>) |
Setting: fetchSizeBytes Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server Default: 1MB |
Kafka config: max.partition.fetch.bytes Default: 1MB |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, <int-value>) |
Setting: bufferSizeBytes Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer (may be similar to SocketBufferSize) Default: 1MB |
Kafka config: receive.buffer.bytes The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, <int-value>) |
Setting: socketTimeoutMs Default: 10000 |
Kafka config: socket.timeout.ms Default: 30000 |
Import package: import kafka.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.SocketTimeout, <int-value>) |
Setting: useStartOffsetTimeIfOffsetOutOfRange Default: true |
Kafka config: auto.offset.reset Possible values: "latest" , "earliest" , "none" Default: latest . Exception: earliest if ProcessingGuarantee is set to AT_LEAST_ONCE |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, <String>) |
Setting: fetchMaxWait Maximum time in ms to wait for the response Default: 10000 |
Kafka config: fetch.max.wait.ms Default: 500 |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, <value>) |
Setting: minFetchByte |
Kafka config: fetch.min.bytes Default: 1 |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, <value>) |
Setting: maxOffsetBehind Specifies how long a spout attempts to retry the processing of a failed tuple. One of the scenarios is when a failing tuple's offset is more than maxOffsetBehind behind the acked offset, the spout stops retrying the tuple.Default: LONG.MAX_VALUE |
No equivalent in storm-kafka-client as of now |
|
Setting: clientId |
Kafka config: client.id |
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig; Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.CLIENT_ID_CONFIG, <String>) |
Important document with config explanations:
The scheme corresponds to both the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, and the ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, since I think some schemes (e.g. https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java) also include the message key.
I think fetchSizeBytes may correspond to max.partition.fetch.bytes in the new consumer, since the fetchSizeBytes parameter as used here https://github.com/apache/storm/blob/8ffa920d3894634aa078f0fdf6b02d270262caf4/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L197 seems to apply per-partition in the fetch request (see https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/api/FetchRequest.scala#L177 and the max_bytes field in https://kafka.apache.org/protocol#The_Messages_Fetch.
The auto.offset.reset setting default depends on the picked ProcessingGuarantee. See https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L471. We made this change because defaulting to "latest" is a bit of a trap to leave in place for people who want at-least-once processing. We default to "earliest" if the ProcessingGuarantee is at-least-once, "latest" otherwise. I get that even with "earliest", we can't offer at-least-once if the messages get deleted in Kafka, but I figured it was better to skip as few messages as possible.
The fetchMaxWait setting is documented at https://kafka.apache.org/protocol#The_Messages_Fetch as "maxWaitTime".
I think the maxOffsetBehind settings works slightly different than described. First (https://github.com/apache/storm/blob/659b59d0339d304140abab25ff6a92a7054a35e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L148), if the Zookeeper offset is farther behind the starting offset (as set by startOffsetTime) than maxOffsetBehind, the spout will ignore the Zookeeper offset and skip up to the starting offset. We currently don't have an equivalent to startOffsetTime in the new spout, it's tracked at https://issues.apache.org/jira/browse/STORM-2992
Second, the maxOffsetBehind setting also causes the spout to skip tuples that are more than maxOffsetBehind the newest acked offset, if they fail. When an offset is acked, any offset pending retry that is more than maxOffsetBehind is discarded. When an offset is failed, if it is more than maxOffsetBehind, it is discarded. There's also a case where the spout will crash because there are too many failed offsets, see the ack and fail methods at https://github.com/apache/storm/blob/659b59d0339d304140abab25ff6a92a7054a35e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L291.
maxOffsetBehind doesn't correspond to maxUncommittedOffsets. With maxUncommittedOffsets, the new spout will stop requesting new tuples if there are more than maxUncommittedOffsets pending tuples, until some tuples can be committed (note that failed tuples will always be allowed to retry). I don't believe there's a direct equivalent between maxOffsetBehind and anything in the new spout. The RetryService can only decide to retry or not retry messages based on their own offsets (e.g. set a max number of retrials for each message), not based on other acked offsets. If we need to support maxOffsetsBehind like behavior, we probably need to add a method to the RetryService telling it when tuples are acked.