Skip to content

Instantly share code, notes, and snippets.

@IronSavior
Last active August 9, 2016 18:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IronSavior/d9277e5fa845a76340a1 to your computer and use it in GitHub Desktop.
Save IronSavior/d9277e5fa845a76340a1 to your computer and use it in GitHub Desktop.
Poll for SQS messages
# Author: Erik Elmore <erik@erikelmore.com>
# License: Public Domain
require 'uri'
require 'aws-sdk'
# Facilitates polling and handling queue messages.
class SQSMessageDispatch
# Configure a new SQSMessageDispatch for the given queue URL.
# @param queue_url [String] URL of the SQS message queue
# @param sqs_opts [Hash] parameters passed to Aws::SQS::Client.new (except :region)
def initialize( queue_url, sqs_opts = {} )
@url = String queue_url
@sqs = Aws::SQS::Client.new(
{ region: URI.parse(@url).host.match(/^sqs\.([^.]+)\./)[1] }.merge sqs_opts
)
end
# Poll message queue continuously until #stop is called. Yields each message to the given block. Message is
# implicitly deleted when the block is finished except when the block raises or throws. Returns Enumerator when
# block is not given.
# @param args [Hash] Additional parameters for Aws::SQS::Client#receive_message
def poll( *args, &blk )
return enum_for __callee__ unless block_given?
raise StandardError, 'Already polling' if polling?
safe_poll *args, &blk
end
alias_method :each_message, :poll
# True when currently polling
def polling?
@polling
end
# End the polling loop after the current iteration.
def stop
return unless @polling
@polling = false
end
# Easy on the eyes
def inspect
'#<%s @url="%s"%s>' % [
self.class,
@url,
polling?? ' (polling)' : nil
]
end
# Safe poll and dispatch
def safe_poll( *args, &blk )
@polling = true
receive_messages(*args).each{ |msg| dispatch_message msg, &blk } while polling?
ensure
@polling = false
end
private :safe_poll
# Safe dispatch of received message. Traps errors and invokes message handler. Messages are implicitly deleted
# unless the handler raises or throws.
# @param msg [Aws::Structure] received message
def dispatch_message( msg )
yield msg
delete_message msg
rescue => e
warn 'Error from handler: "%s" Trace: %s Received Message: %s' % [
e.message,
"\n\t#{e.backtrace.join("\n\t")}\n",
msg.to_h
]
end
private :dispatch_message
# Fetch messages from the queue.
# @param opts [Hash] Additional parameters for Aws::SQS::Client#receive_message except :queue_url
# @return [Array<Aws::Structure>] Zero or more messages
def receive_messages( opts = {} )
@sqs.receive_message(opts.merge queue_url: @url).messages
rescue Aws::SQS::Errors::OverLimit
warn 'Too many in-flight messages at this time'
Array[]
end
private :receive_messages
# Remove a received message from the queue
# @param msg [Aws::Structure] a received message
def delete_message( msg )
@sqs.delete_message queue_url: @url, receipt_handle: msg.receipt_handle
rescue Aws::SQS::Errors::ReceiptHandleIsInvalid
warn 'Cannot delete message with invalid receipt handle: %s' % msg.to_h
end
private :delete_message
end
if $0 == __FILE__
raise 'usage: %s queue_url' % File.basename($0) if ARGV.empty?
SQSMessageDispatch.new(ARGV.first).tap do |worker|
Signal.trap('INT'){
puts
worker.stop
}
worker.poll wait_time_seconds: 3, max_number_of_messages: 10 do |msg|
puts '*** Recieved message: %s' % msg.body
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment