Skip to content

Instantly share code, notes, and snippets.

@dasch
Last active June 22, 2017 17:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dasch/a83ef6f9d1b7c19ce0f2c0350894e701 to your computer and use it in GitHub Desktop.
Save dasch/a83ef6f9d1b7c19ce0f2c0350894e701 to your computer and use it in GitHub Desktop.
class Consumer
class Fetcher
def initialize(input, output)
@input, @output = input, output
@running = true
end
def run
while @running
if @input.empty?
if @output.size < MAX_PENDING_OUTSPUTS
fetch
else
sleep 1
end
else
cmd, opts = @input.deq
send("handle_#{cmd}", opts)
end
end
end
private
def fetch
# ...
response = @broker.fetch_messages(topics: @topics)
response.topics.each do |topic|
topic.partitions.each do |batch|
if batch.error_code == 0
@output.enq([:batch, batch])
# initiate a prefetch here...
else
@output.enq([:error, batch])
end
end
end
end
def handle_shutdown
@running = false
end
def handle_pause
end
end
def each_batch(&block)
inputs = brokers.map { Queue.new }
output = Queue.new
fetchers = brokers.zip(inputs).map {|broker, input| Fetcher.new(broker, input, output) }
fetcher_threads = fetchers.map {|fetcher| Thread.new { fetcher.run } }
loop do
msg, batch = output.deq
case msg
when :batch
yield batch
when :error
# ?
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment