Skip to content

Instantly share code, notes, and snippets.

@srishtyagrawal
Last active April 19, 2018 00:41
Show Gist options
  • Save srishtyagrawal/850b0c3f661cf3c620c27f314791224b to your computer and use it in GitHub Desktop.
Save srishtyagrawal/850b0c3f661cf3c620c27f314791224b to your computer and use it in GitHub Desktop.

The storm-kafka configs were taken from SpoutConfig and KafkaConfig. storm-kafka-client spout configurations were taken from KafkaSpoutConfig.

Storm-0.9.6 SpoutConfig Storm-1.0.6 KafkaSpoutConfig name KafkaSpoutConfig usage help
Setting: startOffsetTime

Default: EarliestTime
________________________________________________
forceFromStart
Default: false

startOffsetTime & forceFromStart together determine where consumer offsets are being read from, ZooKeeper, beginning, or end of Kafka stream
Setting: FirstPollOffsetStrategy
Default: UNCOMMITTED_EARLIEST

Refer to this gist for choosing the right FirstPollOffsetStrategy based on your startOffsetTime & forceFromStart settings
Import package: org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.<strategy-name>

Usage: <KafkaSpoutConfig-Builder>.setFirstPollOffsetStrategy(<strategy-name>)
Setting: scheme

The interface that specifies how a ByteBuffer from a Kafka topic is transformed into Storm tuple
Default: RawMultiScheme
Deserializers Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <deserializer-class>)

<KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, <deserializer-class>)
Setting: fetchSizeBytes

Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server
Default: 1MB
Kafka config: max.partition.fetch.bytes
Default: 1MB
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, <int-value>)
Setting: bufferSizeBytes

Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer (may be similar to SocketBufferSize)
Default: 1MB
Kafka config: receive.buffer.bytes

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, <int-value>)
Setting: socketTimeoutMs

Default: 10000
Kafka config: socket.timeout.ms
Default: 30000
Import package: import kafka.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.SocketTimeout, <int-value>)
Setting: useStartOffsetTimeIfOffsetOutOfRange

Default: true
Kafka config: auto.offset.reset

Possible values: "latest", "earliest", "none"
Default: latest. Exception: earliest if ProcessingGuarantee is set to AT_LEAST_ONCE
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, <String>)
Setting: fetchMaxWait

Maximum time in ms to wait for the response
Default: 10000
Kafka config: fetch.max.wait.ms
Default: 500
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, <value>)
Setting: minFetchByte Kafka config: fetch.min.bytes
Default: 1
Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, <value>)
Setting: maxOffsetBehind

Specifies how long a spout attempts to retry the processing of a failed tuple. One of the scenarios is when a failing tuple's offset is more than maxOffsetBehind behind the acked offset, the spout stops retrying the tuple.
Default: LONG.MAX_VALUE
No equivalent in storm-kafka-client as of now
Setting: clientId Kafka config: client.id Import package: import org.apache.kafka.clients.consumer.ConsumerConfig;

Usage: <KafkaSpoutConfig-Builder>.setProp(ConsumerConfig.CLIENT_ID_CONFIG, <String>)

Important document with config explanations:

@srishtyagrawal
Copy link
Author

srishtyagrawal commented Apr 16, 2018

Thanks a lot Stig for the comments, they are very useful and informative.

scheme: Addressed. I missed the KeyValueScheme and hence didn't add KEY DESERIALIZER, thanks for noticing that.

fetchSizeBytes: Addressed. The third link (with algorithm) was useful in understanding why fetchSizeBytes is analogous to max.partition.fetch.bytes and not fetch.max.bytes. Another thing to note is that the default for fetchSizeBytes is the same as max.partition.fetch.bytes (reassures your theory).

auto.offset.reset: I have modified the default to have an exception depending on the ProcessingGuarantee configuration variable. Can you check if this looks good now?

fetchMaxWait: Thanks, have added the documentation in the table.

maxOffsetBehind: Thanks for the elaborate explanation. I understand this configuration much better now.

I am creating a Pull Request to add this table in storm-kafka-client documentation but before I do that I have another question about the links in this gist. Currently all the links reference to the Storm-1.0.6 release version. Is it ok if I modify these links to point to the latest release version v1.2.1 (the current links in storm-kafka-client document are broken too)?

@srdo
Copy link

srdo commented Apr 17, 2018

Thanks, I think this is nearly there. The maxOffsetBehind section says that "If a failing tuple's offset is less than maxOffsetBehind, the spout stops retrying the tuple.". Shouldn't it be more than? i.e. if the latest offset is 100, and you set maxOffsetBehind to 50, and then offset 30 fails, 30 is more than maxOffsetBehind behind the latest offset, so it is not retried.

Regarding the links, I think we should try to use links that automatically point at the right release. There's some documentation about it here https://github.com/apache/storm-site#how-release-specific-docs-work, and example usage "The allowed values are listed in the FirstPollOffsetStrategy javadocs" (from https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md). It would be great if you fix any broken links you find, or any links that are hard coded to point at a specific release.

@srishtyagrawal
Copy link
Author

I copied the maxOffsetBehind documentation from here. It is confusing because from your earlier example the value 30 itself is lesser than 100-50, but I like the idea of adding behind to make it more clear. As there are more than 1 scenarios where maxOffsetBehind is used, I have modified the documentation to specify the fail scenario as an example.
Thanks for the documentation on links, I will fix all the existing links and the ones which are currently broken in storm-kafka-client documentation.

@srishtyagrawal
Copy link
Author

srishtyagrawal commented Apr 17, 2018

Seems like all the release related links in storm-kafka-client.md don't work. I looked at other docs as well, for example Hooks.md, Concepts.md, Configuration.md, Common-patterns.md (the first 4 documents I looked into for relative links) where these links gave a 404. Yet to figure out why the relative links don't work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment