Skip to content

Instantly share code, notes, and snippets.

@ivoanjo
Created August 27, 2016 00:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ivoanjo/f931fa16f963278354472f67e54a2115 to your computer and use it in GitHub Desktop.
Save ivoanjo/f931fa16f963278354472f67e54a2115 to your computer and use it in GitHub Desktop.
Consuming multiple queues with MarchHare's low-level api
require 'march_hare'
require 'pry'
connection = MarchHare.connect(thread_pool_size: 10, automatically_recover: false)
connection.start
channel = connection.create_channel
# create queues
queue_names = (1..3).map { |n| "queue#{n}" }
queues = queue_names.each do |name|
channel.queue(name)
end
# create consumer. meh api :(
# note: recover_from_network_failure is broken by passing a nil queue (second
# parameter) and this requires automatically_recover: false to be passed when creating
# the connection to it doesn't get triggered
consumer = MarchHare::CallbackConsumer.new(channel, nil, {}, proc { |headers, message|
puts "Got #{message} (Thread=#{Thread.current.object_id})"
sleep 2
channel.basic_ack(headers.envelope.delivery_tag, false)
})
# listen to queues
queue_names.each do |name|
puts "Setting up consume of #{name}"
channel.basic_consume(name, false, consumer)
end
# publish message to each
queue_names.each do |name|
channel.default_exchange.publish("Hello from #{name}!", routing_key: name)
end
sleep 10
connection.close
@axfcampos
Copy link

From reading the code I thought the consumer would be submitted to the thread_pool to be executed. Ran it locally and it is done sequentially... that is weird.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment