Skip to content

Instantly share code, notes, and snippets.

@dasch
Created June 3, 2016 18:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dasch/6fffaa75a774a74869b69e4c81fa8bb3 to your computer and use it in GitHub Desktop.
Save dasch/6fffaa75a774a74869b69e4c81fa8bb3 to your computer and use it in GitHub Desktop.
consumer = kafka.consumer(group_id: "worker")
# If the user block raises an exception, the consumer will move on to the
# other partitions it's handling. Once finished, it will retry, up to x times.
# Once all attempts have been made, the partition is put on a pause list and
# not longer fetched from, until some timeout.
consumer.subscribe("jobs", retries: 3, failure_policy: :pause_partition)
# In this API, it's assumed that we'll pause a partition, so the configuration
# is in terms of the timeout. I guess if 0 is passed we would skip pausing...
consumer.subscribe("jobs", retries: 3, pause_timeout: 30)
consumer.each_message do |message|
process(message)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment