Skip to content

Instantly share code, notes, and snippets.

@dasch
Last active June 3, 2016 18:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dasch/12deeea01b19419cf56b32db07439cb1 to your computer and use it in GitHub Desktop.
Save dasch/12deeea01b19419cf56b32db07439cb1 to your computer and use it in GitHub Desktop.
consumer = kafka.consumer(group_id: "worker")
consumer.subscribe("jobs")
# This could all be done in a high-level library.
loop do
# `poll` fetches a set of message batches.
batches = consumer.poll
batches.each do |batch|
begin
batch.messages.each do |message|
attempt = 1
begin
# User code processes the message, possibly raising an exception.
process_message(message)
rescue => e
# Retry up to N times.
if attemp <= max_attempts
attempt += 1
retry
else
raise
end
else
# Mark the message as processed if there was no exception.
consumer.mark_as_processed(message)
end
end
rescue => e
# Stop processing the batch and pause the partition. Continue with
# the rest of the batches.
consumer.pause(batch.topic, batch.partition, duration: 30)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment