Skip to content

Instantly share code, notes, and snippets.

@trevorrowe
Created January 29, 2015 21:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save trevorrowe/2470ceb420edb322a178 to your computer and use it in GitHub Desktop.
Save trevorrowe/2470ceb420edb322a178 to your computer and use it in GitHub Desktop.
A simple **untested** extraction from the V1 SDK for Ruby from AWS::SQS::Queue#poll.
# A simple extraction from the V1 SDK for Ruby from AWS::SQS::Queue#poll.
# No effort was made to test this code.
#
# @example Basic Usage
#
# poller = QueuePoller.new(queue_url)
# poller.poll { |msg| puts msg.body}
#
# @example Customized API client
#
# sqs = Aws::SQS::Client.new(region:'us-west-2')
#
# poller = QueuePoller.new(queue_url, client:sqs)
# poller.poll { |msg| puts msg.body}
#
class QueuePoller
DEFAULT_WAIT_TIME_SECONDS = 15
def initialize(queue_url, client:nil)
@queue_url = queue_url
@client = client || Aws::SQS::Client.new
end
# Polls continually for messages. For example, you can use
# this to poll indefinitely:
#
# queue.poll { |msg| puts msg.body }
#
# Or, to poll indefinitely for the first message and then
# continue polling until no message is received for a period
# of at least ten seconds:
#
# queue.poll(:initial_timeout => false,
# :idle_timeout => 10) { |msg| puts msg.body }
#
# As with the block form of {#receive_message}, this method
# automatically deletes the message then the block exits
# normally.
#
# @yieldparam [ReceivedMessage] message Each message that was received.
#
# @param [Hash] opts Options for polling.
#
# @option opts [Integer] :wait_time_seconds The number of seconds
# the service should wait for a response when requesting a new message.
# Defaults to {DEFAULT_WAIT_TIME_SECONDS}. Use `nil` to
# use the queue's global long polling wait time setting.
# See {#wait_time_seconds} to set the global long poll setting
# on the queue.
#
# @option opts [Integer] :idle_timeout The maximum number of
# seconds to spend polling while no messages are being
# returned. By default this method polls indefinitely
# whether messages are received or not.
#
# @option opts [Integer] :initial_timeout The maximum number
# of seconds to spend polling before the first message is
# received. This option defaults to the value of
# `:idle_timeout`. You can specify `false` to poll
# indefinitely for the first message when `:idle_timeout` is
# set.
#
# @option opts [Integer] :batch_size The maximum number of
# messages to retrieve in a single request. By default
# messages are received one at a time. Valid values:
# integers from 1 to 10.
#
# @option opts [Integer] :visibility_timeout The duration (in
# seconds) that the received messages are hidden from
# subsequent retrieve requests. Valid values: integer from
# 0 to 43200 (maximum 12 hours)
#
# @option opts [Array<Symbol, String>] :attributes The
# attributes to populate in each received message. Valid values:
#
# * `:all` (to populate all attributes)
# * `:sender_id`
# * `:sent_at`
# * `:receive_count`
# * `:first_received_at`
#
# See {ReceivedMessage} for documentation on each
# attribute's meaning.
#
# @option opts [Float, Integer] :poll_interval As of
# v1.7.2, this option is no longer used. See the
# `:wait_time_seconds` option for long polling instead.
#
# @return [nil]
def poll(opts = {}, &block)
opts[:limit] = opts.delete(:batch_size) if
opts.key?(:batch_size)
opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless
opts.has_key?(:wait_time_seconds)
last_message_at = Time.now
got_first = false
loop do
got_msg = false
receive_messages(opts) do |message|
got_msg = got_first = true
last_message_at = Time.now
yield(message)
end
unless got_msg
return if hit_timeout?(got_first, last_message_at, opts)
end
end
nil
end
private
def receive_messages(opts = {}, &block)
resp = @client.receive_message(receive_opts(opts))
failed = verify_receive_message_checksum resp
raise Errors::ChecksumError.new(failed) unless failed.empty?
messages = resp[:messages].map do |m|
ReceivedMessage.new(self, m[:message_id], m[:receipt_handle],
:body => m[:body],
:md5 => m[:md5_of_body],
:request_id => (resp[:response_metadata] || {})[:request_id],
:attributes => m[:attributes],
:message_attributes => m[:message_attributes])
end
if block
call_message_block(messages, block)
elsif opts[:limit] && opts[:limit] != 1
messages
else
messages.first
end
end
def receive_opts(opts)
receive_opts = { :queue_url => @queue_url }
receive_opts[:visibility_timeout] = opts[:visibility_timeout] if
opts[:visibility_timeout]
receive_opts[:max_number_of_messages] = opts[:limit] if
opts[:limit]
receive_opts[:wait_time_seconds] = opts[:wait_time_seconds] if
opts[:wait_time_seconds]
receive_opts[:message_attribute_names] = opts[:message_attribute_names] if
opts[:message_attribute_names]
if names = opts[:attributes]
receive_opts[:attribute_names] = names.map do |name|
name = ReceivedMessage::ATTRIBUTE_ALIASES[name.to_sym] if
ReceivedMessage::ATTRIBUTE_ALIASES.key?(name.to_sym)
name = Core::Inflection.class_name(name.to_s) if name.kind_of?(Symbol)
name
end
end
receive_opts
end
def hit_timeout?(got_first, last_message_at, opts)
initial_timeout = opts[:initial_timeout]
idle_timeout = opts[:idle_timeout]
timeout = (got_first ||
# if initial_timeout is false (as opposed
# to nil) then we skip the branch and poll
# indefinitely until the first message
# comes
(!initial_timeout && initial_timeout != false) ?
idle_timeout :
initial_timeout) and
Time.now - last_message_at > timeout
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment