-
-
Save chussenot/6f61227e5ea9d3ec2fb866892f5782f3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# rubocop:disable Style/HashSyntax | |
# shoryuken用のタスク | |
namespace :queues do | |
desc 'Create default SNS & SQS' | |
task 'setup' => :environment do | |
# http://qiita.com/takeyuweb/items/cdc262d97c3e863c15ff | |
# TODO | |
end | |
desc "List all SQS queues" | |
task :list => :environment do | |
queues.each { |queue_url| puts queue_url } | |
end | |
desc "Get details for a specific SQS Queue" | |
task :queue, [:identifier] => :environment do |_t, args| | |
if args.identifier.present? | |
puts queue(args.identifier) | |
else | |
fail 'No SQS queue name or url was specified' | |
end | |
end | |
desc "Ensure existence of a SQS Dead Letter Queue" | |
task :dead_letter_queue => :environment do | |
name = ENV['SQS_FAILURE_QUEUE'] | |
fail 'SQS_FAILURE_QUEUE env variable is not set' if name.blank? | |
if queue_names.include?(name) | |
puts "SQS Dead Letter Queue '#{name}' exists" | |
else | |
sqs.create_queue(queue_name: name) | |
puts "SQS Dead Letter Queue '#{name}' created" | |
end | |
end | |
desc "Create a new SQS queue with dead job support" | |
task :create, [:name, :retries] => :dead_letter_queue do |_t, args| | |
fail "An SQS queue name must be specified" if args.name.blank? | |
if queue_names.include?(name) | |
puts "SQS Queue '#{name}' exists" | |
else | |
Rake::Task['queues:dead_letter_queue'].invoke | |
retries = args.retries || 7 | |
arn = queue(ENV['SQS_FAILURE_QUEUE'])['QueueArn'] | |
attrs = { 'RedrivePolicy' => | |
%({"maxReceiveCount":"#{retries}", "deadLetterTargetArn":"#{arn}"}") | |
} | |
sqs.create_queue(queue_name: name, attributes: attrs) | |
puts "SQS Queue '#{name}' created" | |
end | |
end | |
desc "Setup the default SQS queues for a new project" | |
task :setup, [:retries] => :environment do |_t, args| | |
name = ENV['SQS_DEFAULT_QUEUE'] | |
fail 'SQS_DEFAULT_QUEUE env variable is not set' if name.blank? | |
retries = args.retries || 7 | |
Rake::Task['queues:create'].invoke(name, retries) | |
end | |
desc "Delete an SQS queue" | |
task :delete, [:identifier] => :environment do | |
fail 'No SQS queue name or url was specified' if args.identifier.blank? | |
sqs.delete_queue(queue_url: queue_url(args.identifier), attributes: attrs) | |
puts "SQS Queue '#{name}' deleted" | |
end | |
desc "Delete all SQS queues" | |
task :delete_all => :environment do | |
STDOUT.puts "Are sure you want to delete all SQS Queues? Type 'CONFIRM' to confirm:" | |
input = STDIN.gets.chomp | |
unless input == 'CONFIRM' | |
fail "Aborting deletion of SQS Queues. You entered: #{input}" | |
end | |
queues.each do |queue| | |
Rake::Task['queues:delete'].invoke(queue) | |
end | |
end | |
def sqs | |
@sqs ||= Aws::SQS::Client.new | |
end | |
def sns | |
@sns ||= Aws::SNS::Client.new | |
end | |
def queues | |
sqs.list_queues.inject([]) do |list, page| | |
list.concat(page.queue_urls) | |
end | |
end | |
def queue_names | |
queues.map { |queue| queue.rpartition('/').last } | |
end | |
def queue_url(identifier) | |
if %r{^https?://}.match(identifier) | |
identifier | |
else | |
queues.find { |queue| queue.rpartition('/').last == identifier } | |
end | |
end | |
def queue(identifier) | |
sqs.get_queue_attributes(queue_url: queue_url(identifier), attribute_names: ['All']).attributes | |
end | |
def exist_sns_topic?(topic_name) | |
arn = Shoryuken::SnsArn.new(topic_name).to_s | |
sns.get_topic_attributes(arn) | |
rescue Aws::SNS::Errors::NotFound | |
nil | |
end | |
def create_topic_and_queue(identifier) | |
return false if exist_sns_topic?(identifier) | |
# 通知先SNSトピック作成 | |
resp = sns.create_topic( | |
name: identifier # required | |
) | |
topic_arn = resp.topic_arn | |
# 通知先キュー作成 | |
resp = sqs.create_queue( | |
queue_name: identifier, # required | |
attributes: { | |
ReceiveMessageWaitTimeSeconds: '20' | |
} | |
) | |
qurl = resp.queue_url | |
# トピックへのメッセージを作成した通知先キューへ送るように購読の設定 | |
# キューのARNを取得 | |
resp = sqs.get_queue_attributes( | |
queue_url: qurl, | |
attribute_names: %w(QueueArn) | |
) | |
queue_arn = resp.attributes['QueueArn'] | |
# 取得したARNを使って購読申込 | |
resp = sns.subscribe( | |
topic_arn: topic_arn, # required | |
protocol: 'sqs', # required | |
endpoint: queue_arn | |
) | |
subscription_arn = resp.subscription_arn | |
# 今回はSNSによるメタ情報は不要なので送信したメッセージをそのままキューに送る | |
sns.set_subscription_attributes( | |
subscription_arn: subscription_arn, # required | |
attribute_name: 'RawMessageDelivery', | |
attribute_value: 'true' | |
) | |
# SNSトピックからSQSキューへのメッセージの追加を許可 | |
# http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html#SendMessageToSQS.sqs.permissions | |
policy = { | |
Version: '2012-10-17', | |
Statement: [ | |
{ | |
Sid: 'NotificationsToSQS', | |
Effect: 'Allow', | |
Principal: '*', | |
Action: 'sqs:SendMessage', | |
Resource: queue_arn, | |
Condition: { | |
ArnEquals: { | |
'aws:SourceArn' => topic_arn | |
} | |
} | |
} | |
] | |
} | |
sqs.set_queue_attributes( | |
queue_url: qurl, | |
attributes: { | |
Policy: policy.to_json | |
} | |
) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment