Skip to content

Instantly share code, notes, and snippets.

@IronSavior
Last active August 29, 2015 14:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IronSavior/24c9ed577224887982f1 to your computer and use it in GitHub Desktop.
Save IronSavior/24c9ed577224887982f1 to your computer and use it in GitHub Desktop.
Temporary SQS queues and SNS topics
# Author: Erik Elmore <erik@erikelmore.com>
# License: Public Domain
require 'securerandom'
require 'aws'
# Test the feasability of using temporary queues and topics as a means of receiving
# notifications from remote systems via SQS and SNS. TestClient creates either only a
# queue or both a queue and a topic before sending a request to the server. When the
# client wishes to receive its response directly in a temporary queue, the request
# body is the URL to its temporary queue. When the client wishes to be notified via a
# topic with a subscribed queue, the body of the request is the ARN of the temporary
# SNS topic.
#
# My observations at this time indicate that the setup costs of using either method
# is less than 1 second, but the full cycle (request to response) is about 4x faster
# when using temporary SQS queues without a temporary SNS topic.
class TestClient
def initialize( name, region, server_queue )
@name = name
@sqs = AWS::SQS.new :region => region
@sns = AWS::SNS.new :region => region
@server_queue = server_queue
end
def run( reply_via_sns = true )
send reply_via_sns ? :temporary_queue_via_sns : :temporary_queue do |queue, topic|
req = reply_via_sns ? topic.arn : queue.url
@server_queue.send_message req
puts 'Request sent: %s' % req
queue.poll :attributes => [:all] do |m|
handler m
break
end
end
end
def temporary_queue_via_sns( topic_base_name = @name, queue_base_name = @name )
raise ArgumentError, 'Block required' unless block_given?
temporary_topic(topic_base_name){ |topic|
temporary_queue(queue_base_name){ |queue|
begin
sub = topic.subscribe queue
sub.raw_message_delivery = true
yield queue, topic, sub
ensure
sub.unsubscribe if sub && sub.exists?
end
}
}
end
def temporary_topic( base_name = @name )
raise ArgumentError, 'Block required' unless block_given?
topic = create_temporary_topic base_name
yield topic
ensure
topic.delete if topic
end
def temporary_queue( base_name = @name )
raise ArgumentError, 'Block required' unless block_given?
queue = create_temporary_queue base_name
yield queue
ensure
queue.delete if queue && queue.exists?
end
def create_temporary_queue( base_name = @name )
@sqs.queues.create randomized_name(base_name)
rescue AWS::SQS::Errors::QueueNameExists, AWS::SQS::Errors::QueueDeletedRecently
retry
end
def create_temporary_topic( base_name = @name )
name = nil
begin
name = randomized_name base_name
end while @sns.topics.detect{|t| t.name == name }
@sns.topics.create name
end
def randomized_name( base_name = @name )
'%s-%s' % [base_name, SecureRandom.uuid]
end
def handler( msg )
puts 'Received: "%s"' % msg.body
end
end
if $0 == __FILE__
server_queue = AWS::SQS.new(:region => 'us-west-2').queues.named 'messaging_test_server'
client_region = ARGV[0] || 'us-west-2'
using_sns = ARGV.size > 1
catch :terminate do
Signal.trap('INT'){ throw :terminate }
loop do
TestClient.new('messaging_test_client', client_region, server_queue).run using_sns
puts
end
end
puts
puts "Stopped"
end
# Author: Erik Elmore <erik@erikelmore.com>
# License: Public Domain
require 'aws'
require 'uri'
# Listens to the given AWS::SQS::Queue and sends responses based on the body of
# the request.
class TestServer
def initialize( queue )
@queue = queue
end
def run
puts 'Listening...'
@terminate = false
loop do
break if @terminate
begin
@queue.receive_message(:attributes => [:all]){|m| handler m }
rescue => e
puts 'Error (incoming message ignored): %s' % e.message
puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
end
end
puts
puts 'Server shut down'
end
def stop
@terminate = true
end
def arn_region( arn )
arn.split(':')[3]
end
def url_region( url )
URI.parse(url).host.match(/^sqs\.(.+?)\./)[1]
end
def reply_to( msg, reply )
if msg.body.start_with? 'arn:aws:sns:'
AWS::SNS.new(
:region => arn_region(msg.body)
).topics[msg.body].publish reply
else
AWS::SQS.new(
:region => url_region(msg.body)
).queues[msg.body].send_message reply
end
end
def handler( msg )
puts 'Received: %s' % msg.body
if msg.receive_count > 3
puts 'WARN: Message received more than 3 times. Deleting.'
return
end
reply_to msg, 'Valid Response'
end
end
if $0 == __FILE__
queue = lambda { |queue_name, sqs|
begin
sqs.queues.named queue_name
rescue AWS::SQS::Errors::NonExistentQueue
sqs.queues.create queue_name, :message_retention_period => 10
end
}.call 'messaging_test_server', AWS::SQS.new(:region => 'us-west-2')
TestServer.new(queue).tap{ |server|
Signal.trap('INT'){ server.stop }
}.run
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment