Skip to content

Instantly share code, notes, and snippets.

@srishtyagrawal
Last active April 19, 2018 00:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save srishtyagrawal/850b0c3f661cf3c620c27f314791224b to your computer and use it in GitHub Desktop.
Save srishtyagrawal/850b0c3f661cf3c620c27f314791224b to your computer and use it in GitHub Desktop.

The storm-kafka configs were taken from SpoutConfig and KafkaConfig. storm-kafka-client spout configurations were taken from KafkaSpoutConfig.

Storm-0.9.6 SpoutConfig Storm-1.0.6 KafkaSpoutConfig name KafkaSpoutConfig usage help
Setting: startOffsetTime

Default: EarliestTime
________________________________________________
forceFromStart
Default: false

startOffsetTime & forceFromStart together determine where consumer offsets are being read from, ZooKeeper, beginning, or end of Kafka stream
Setting: FirstPollOffsetStrategy
Default: UNCOMMITTED_EARLIEST

Refer to this gist for choosing the right FirstPollOffsetStrategy based on your startOffsetTime & forceFromStart settings
Import package: org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.<strategy-name>

Usage: <KafkaSpoutConfig-Builder>.setFirstPollOffsetStrategy(<strategy-name>)
Setting: scheme

The interface that specifies how a ByteBuffer from a Kafka topic is transformed into Storm tuple
Default: RawMultiScheme
Deserializers Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <deserializer-class>)

<KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, <deserializer-class>)
Setting: fetchSizeBytes

Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server
Default: 1MB
Kafka config: max.partition.fetch.bytes
Default: 1MB
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, <int-value>)
Setting: bufferSizeBytes

Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer (may be similar to SocketBufferSize)
Default: 1MB
Kafka config: receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, <int-value>)
Setting: socketTimeoutMs

Default: 10000
Kafka config: socket.timeout.ms
Default: 30000
Import package: import kafka.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.SocketTimeout, <int-value>)
Setting: useStartOffsetTimeIfOffsetOutOfRange

Default: true
Kafka config: auto.offset.reset

Possible values: "latest", "earliest", "none"
Default: latest. Exception: earliest if ProcessingGuarantee is set to AT_LEAST_ONCE
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, <String>)
Setting: fetchMaxWait

Maximum time in ms to wait for the response
Default: 10000
Kafka config: fetch.max.wait.ms
Default: 500
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, <value>)
Setting: minFetchByte Kafka config: fetch.min.bytes
Default: 1
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, <value>)
Setting: maxOffsetBehind

Specifies how long a spout attempts to retry the processing of a failed tuple. One of the scenarios is when a failing tuple's offset is more than maxOffsetBehind behind the acked offset, the spout stops retrying the tuple.
Default: LONG.MAX_VALUE
No equivalent in storm-kafka-client as of now
Setting: clientId Kafka config: client.id Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.CLIENT_ID_CONFIG, <String>)

Important document with config explanations:

@srdo
Copy link

srdo commented Apr 14, 2018

The scheme corresponds to both the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, and the ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, since I think some schemes (e.g. https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java) also include the message key.

I think fetchSizeBytes may correspond to max.partition.fetch.bytes in the new consumer, since the fetchSizeBytes parameter as used here https://github.com/apache/storm/blob/8ffa920d3894634aa078f0fdf6b02d270262caf4/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L197 seems to apply per-partition in the fetch request (see https://github.com/apache/kafka/blob/0.8/core/src/main/scala/kafka/api/FetchRequest.scala#L177 and the max_bytes field in https://kafka.apache.org/protocol#The_Messages_Fetch.

The auto.offset.reset setting default depends on the picked ProcessingGuarantee. See https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L471. We made this change because defaulting to "latest" is a bit of a trap to leave in place for people who want at-least-once processing. We default to "earliest" if the ProcessingGuarantee is at-least-once, "latest" otherwise. I get that even with "earliest", we can't offer at-least-once if the messages get deleted in Kafka, but I figured it was better to skip as few messages as possible.

The fetchMaxWait setting is documented at https://kafka.apache.org/protocol#The_Messages_Fetch as "maxWaitTime".

I think the maxOffsetBehind settings works slightly different than described. First (https://github.com/apache/storm/blob/659b59d0339d304140abab25ff6a92a7054a35e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L148), if the Zookeeper offset is farther behind the starting offset (as set by startOffsetTime) than maxOffsetBehind, the spout will ignore the Zookeeper offset and skip up to the starting offset. We currently don't have an equivalent to startOffsetTime in the new spout, it's tracked at https://issues.apache.org/jira/browse/STORM-2992

Second, the maxOffsetBehind setting also causes the spout to skip tuples that are more than maxOffsetBehind the newest acked offset, if they fail. When an offset is acked, any offset pending retry that is more than maxOffsetBehind is discarded. When an offset is failed, if it is more than maxOffsetBehind, it is discarded. There's also a case where the spout will crash because there are too many failed offsets, see the ack and fail methods at https://github.com/apache/storm/blob/659b59d0339d304140abab25ff6a92a7054a35e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java#L291.

maxOffsetBehind doesn't correspond to maxUncommittedOffsets. With maxUncommittedOffsets, the new spout will stop requesting new tuples if there are more than maxUncommittedOffsets pending tuples, until some tuples can be committed (note that failed tuples will always be allowed to retry). I don't believe there's a direct equivalent between maxOffsetBehind and anything in the new spout. The RetryService can only decide to retry or not retry messages based on their own offsets (e.g. set a max number of retrials for each message), not based on other acked offsets. If we need to support maxOffsetsBehind like behavior, we probably need to add a method to the RetryService telling it when tuples are acked.

@srishtyagrawal
Copy link
Author

srishtyagrawal commented Apr 16, 2018

Thanks a lot Stig for the comments, they are very useful and informative.

scheme: Addressed. I missed the KeyValueScheme and hence didn't add KEY DESERIALIZER, thanks for noticing that.

fetchSizeBytes: Addressed. The third link (with algorithm) was useful in understanding why fetchSizeBytes is analogous to max.partition.fetch.bytes and not fetch.max.bytes. Another thing to note is that the default for fetchSizeBytes is the same as max.partition.fetch.bytes (reassures your theory).

auto.offset.reset: I have modified the default to have an exception depending on the ProcessingGuarantee configuration variable. Can you check if this looks good now?

fetchMaxWait: Thanks, have added the documentation in the table.

maxOffsetBehind: Thanks for the elaborate explanation. I understand this configuration much better now.

I am creating a Pull Request to add this table in storm-kafka-client documentation but before I do that I have another question about the links in this gist. Currently all the links reference to the Storm-1.0.6 release version. Is it ok if I modify these links to point to the latest release version v1.2.1 (the current links in storm-kafka-client document are broken too)?

@srdo
Copy link

srdo commented Apr 17, 2018

Thanks, I think this is nearly there. The maxOffsetBehind section says that "If a failing tuple's offset is less than maxOffsetBehind, the spout stops retrying the tuple.". Shouldn't it be more than? i.e. if the latest offset is 100, and you set maxOffsetBehind to 50, and then offset 30 fails, 30 is more than maxOffsetBehind behind the latest offset, so it is not retried.

Regarding the links, I think we should try to use links that automatically point at the right release. There's some documentation about it here https://github.com/apache/storm-site#how-release-specific-docs-work, and example usage "The allowed values are listed in the FirstPollOffsetStrategy javadocs" (from https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md). It would be great if you fix any broken links you find, or any links that are hard coded to point at a specific release.

@srishtyagrawal
Copy link
Author

I copied the maxOffsetBehind documentation from here. It is confusing because from your earlier example the value 30 itself is lesser than 100-50, but I like the idea of adding behind to make it more clear. As there are more than 1 scenarios where maxOffsetBehind is used, I have modified the documentation to specify the fail scenario as an example.
Thanks for the documentation on links, I will fix all the existing links and the ones which are currently broken in storm-kafka-client documentation.

@srishtyagrawal
Copy link
Author

srishtyagrawal commented Apr 17, 2018

Seems like all the release related links in storm-kafka-client.md don't work. I looked at other docs as well, for example Hooks.md, Concepts.md, Configuration.md, Common-patterns.md (the first 4 documents I looked into for relative links) where these links gave a 404. Yet to figure out why the relative links don't work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment