Created
January 29, 2015 21:47
-
-
Save trevorrowe/2470ceb420edb322a178 to your computer and use it in GitHub Desktop.
A simple **untested** extraction from the V1 SDK for Ruby from AWS::SQS::Queue#poll.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A simple extraction from the V1 SDK for Ruby from AWS::SQS::Queue#poll. | |
# No effort was made to test this code. | |
# | |
# @example Basic Usage | |
# | |
# poller = QueuePoller.new(queue_url) | |
# poller.poll { |msg| puts msg.body} | |
# | |
# @example Customized API client | |
# | |
# sqs = Aws::SQS::Client.new(region:'us-west-2') | |
# | |
# poller = QueuePoller.new(queue_url, client:sqs) | |
# poller.poll { |msg| puts msg.body} | |
# | |
class QueuePoller | |
DEFAULT_WAIT_TIME_SECONDS = 15 | |
def initialize(queue_url, client:nil) | |
@queue_url = queue_url | |
@client = client || Aws::SQS::Client.new | |
end | |
# Polls continually for messages. For example, you can use | |
# this to poll indefinitely: | |
# | |
# queue.poll { |msg| puts msg.body } | |
# | |
# Or, to poll indefinitely for the first message and then | |
# continue polling until no message is received for a period | |
# of at least ten seconds: | |
# | |
# queue.poll(:initial_timeout => false, | |
# :idle_timeout => 10) { |msg| puts msg.body } | |
# | |
# As with the block form of {#receive_message}, this method | |
# automatically deletes the message then the block exits | |
# normally. | |
# | |
# @yieldparam [ReceivedMessage] message Each message that was received. | |
# | |
# @param [Hash] opts Options for polling. | |
# | |
# @option opts [Integer] :wait_time_seconds The number of seconds | |
# the service should wait for a response when requesting a new message. | |
# Defaults to {DEFAULT_WAIT_TIME_SECONDS}. Use `nil` to | |
# use the queue's global long polling wait time setting. | |
# See {#wait_time_seconds} to set the global long poll setting | |
# on the queue. | |
# | |
# @option opts [Integer] :idle_timeout The maximum number of | |
# seconds to spend polling while no messages are being | |
# returned. By default this method polls indefinitely | |
# whether messages are received or not. | |
# | |
# @option opts [Integer] :initial_timeout The maximum number | |
# of seconds to spend polling before the first message is | |
# received. This option defaults to the value of | |
# `:idle_timeout`. You can specify `false` to poll | |
# indefinitely for the first message when `:idle_timeout` is | |
# set. | |
# | |
# @option opts [Integer] :batch_size The maximum number of | |
# messages to retrieve in a single request. By default | |
# messages are received one at a time. Valid values: | |
# integers from 1 to 10. | |
# | |
# @option opts [Integer] :visibility_timeout The duration (in | |
# seconds) that the received messages are hidden from | |
# subsequent retrieve requests. Valid values: integer from | |
# 0 to 43200 (maximum 12 hours) | |
# | |
# @option opts [Array<Symbol, String>] :attributes The | |
# attributes to populate in each received message. Valid values: | |
# | |
# * `:all` (to populate all attributes) | |
# * `:sender_id` | |
# * `:sent_at` | |
# * `:receive_count` | |
# * `:first_received_at` | |
# | |
# See {ReceivedMessage} for documentation on each | |
# attribute's meaning. | |
# | |
# @option opts [Float, Integer] :poll_interval As of | |
# v1.7.2, this option is no longer used. See the | |
# `:wait_time_seconds` option for long polling instead. | |
# | |
# @return [nil] | |
def poll(opts = {}, &block) | |
opts[:limit] = opts.delete(:batch_size) if | |
opts.key?(:batch_size) | |
opts[:wait_time_seconds] = DEFAULT_WAIT_TIME_SECONDS unless | |
opts.has_key?(:wait_time_seconds) | |
last_message_at = Time.now | |
got_first = false | |
loop do | |
got_msg = false | |
receive_messages(opts) do |message| | |
got_msg = got_first = true | |
last_message_at = Time.now | |
yield(message) | |
end | |
unless got_msg | |
return if hit_timeout?(got_first, last_message_at, opts) | |
end | |
end | |
nil | |
end | |
private | |
def receive_messages(opts = {}, &block) | |
resp = @client.receive_message(receive_opts(opts)) | |
failed = verify_receive_message_checksum resp | |
raise Errors::ChecksumError.new(failed) unless failed.empty? | |
messages = resp[:messages].map do |m| | |
ReceivedMessage.new(self, m[:message_id], m[:receipt_handle], | |
:body => m[:body], | |
:md5 => m[:md5_of_body], | |
:request_id => (resp[:response_metadata] || {})[:request_id], | |
:attributes => m[:attributes], | |
:message_attributes => m[:message_attributes]) | |
end | |
if block | |
call_message_block(messages, block) | |
elsif opts[:limit] && opts[:limit] != 1 | |
messages | |
else | |
messages.first | |
end | |
end | |
def receive_opts(opts) | |
receive_opts = { :queue_url => @queue_url } | |
receive_opts[:visibility_timeout] = opts[:visibility_timeout] if | |
opts[:visibility_timeout] | |
receive_opts[:max_number_of_messages] = opts[:limit] if | |
opts[:limit] | |
receive_opts[:wait_time_seconds] = opts[:wait_time_seconds] if | |
opts[:wait_time_seconds] | |
receive_opts[:message_attribute_names] = opts[:message_attribute_names] if | |
opts[:message_attribute_names] | |
if names = opts[:attributes] | |
receive_opts[:attribute_names] = names.map do |name| | |
name = ReceivedMessage::ATTRIBUTE_ALIASES[name.to_sym] if | |
ReceivedMessage::ATTRIBUTE_ALIASES.key?(name.to_sym) | |
name = Core::Inflection.class_name(name.to_s) if name.kind_of?(Symbol) | |
name | |
end | |
end | |
receive_opts | |
end | |
def hit_timeout?(got_first, last_message_at, opts) | |
initial_timeout = opts[:initial_timeout] | |
idle_timeout = opts[:idle_timeout] | |
timeout = (got_first || | |
# if initial_timeout is false (as opposed | |
# to nil) then we skip the branch and poll | |
# indefinitely until the first message | |
# comes | |
(!initial_timeout && initial_timeout != false) ? | |
idle_timeout : | |
initial_timeout) and | |
Time.now - last_message_at > timeout | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment