Last active
June 3, 2016 18:29
-
-
Save dasch/12deeea01b19419cf56b32db07439cb1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
consumer = kafka.consumer(group_id: "worker") | |
consumer.subscribe("jobs") | |
# This could all be done in a high-level library. | |
loop do | |
# `poll` fetches a set of message batches. | |
batches = consumer.poll | |
batches.each do |batch| | |
begin | |
batch.messages.each do |message| | |
attempt = 1 | |
begin | |
# User code processes the message, possibly raising an exception. | |
process_message(message) | |
rescue => e | |
# Retry up to N times. | |
if attemp <= max_attempts | |
attempt += 1 | |
retry | |
else | |
raise | |
end | |
else | |
# Mark the message as processed if there was no exception. | |
consumer.mark_as_processed(message) | |
end | |
end | |
rescue => e | |
# Stop processing the batch and pause the partition. Continue with | |
# the rest of the batches. | |
consumer.pause(batch.topic, batch.partition, duration: 30) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment