Skip to content

Instantly share code, notes, and snippets.

@dasch
Last active March 9, 2016 12:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dasch/34f40482e242abbcf896 to your computer and use it in GitHub Desktop.
Save dasch/34f40482e242abbcf896 to your computer and use it in GitHub Desktop.
# When no group id is passed there's no coordination.
consumer = kafka.consumer
consumer.subscribe("topic1")
# You can seek separately for each partition.
consumer.seek("topic1", partition: 0, offset: 42)
consumer.seek("topic1", partition: 1, offset: 13)
# `poll` returns a list of batches -- useful for e.g. checking against
# the highwater mark.
batches = consumer.poll(timeout: 5, max_bytes_per_partition: 1024 * 5)
batches.each do |batch|
lag = batch.highwater_mark_offset - batch.messages.first.offset
logger.info "Consumer lag for #{batch.topic}/#{batch.partition}: #{lag}"
end
# This is simple enough that a separate message list based API may not
# be needed.
messages = batches.flat_map {|batch| batch.messages }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment