Skip to content

Instantly share code, notes, and snippets.

@chussenot
Forked from masuidrive/queues.rake
Created March 13, 2018 15:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chussenot/6f61227e5ea9d3ec2fb866892f5782f3 to your computer and use it in GitHub Desktop.
Save chussenot/6f61227e5ea9d3ec2fb866892f5782f3 to your computer and use it in GitHub Desktop.
# rubocop:disable Style/HashSyntax
# shoryuken用のタスク
namespace :queues do
desc 'Create default SNS & SQS'
task 'setup' => :environment do
# http://qiita.com/takeyuweb/items/cdc262d97c3e863c15ff
# TODO
end
desc "List all SQS queues"
task :list => :environment do
queues.each { |queue_url| puts queue_url }
end
desc "Get details for a specific SQS Queue"
task :queue, [:identifier] => :environment do |_t, args|
if args.identifier.present?
puts queue(args.identifier)
else
fail 'No SQS queue name or url was specified'
end
end
desc "Ensure existence of a SQS Dead Letter Queue"
task :dead_letter_queue => :environment do
name = ENV['SQS_FAILURE_QUEUE']
fail 'SQS_FAILURE_QUEUE env variable is not set' if name.blank?
if queue_names.include?(name)
puts "SQS Dead Letter Queue '#{name}' exists"
else
sqs.create_queue(queue_name: name)
puts "SQS Dead Letter Queue '#{name}' created"
end
end
desc "Create a new SQS queue with dead job support"
task :create, [:name, :retries] => :dead_letter_queue do |_t, args|
fail "An SQS queue name must be specified" if args.name.blank?
if queue_names.include?(name)
puts "SQS Queue '#{name}' exists"
else
Rake::Task['queues:dead_letter_queue'].invoke
retries = args.retries || 7
arn = queue(ENV['SQS_FAILURE_QUEUE'])['QueueArn']
attrs = { 'RedrivePolicy' =>
%({"maxReceiveCount":"#{retries}", "deadLetterTargetArn":"#{arn}"}")
}
sqs.create_queue(queue_name: name, attributes: attrs)
puts "SQS Queue '#{name}' created"
end
end
desc "Setup the default SQS queues for a new project"
task :setup, [:retries] => :environment do |_t, args|
name = ENV['SQS_DEFAULT_QUEUE']
fail 'SQS_DEFAULT_QUEUE env variable is not set' if name.blank?
retries = args.retries || 7
Rake::Task['queues:create'].invoke(name, retries)
end
desc "Delete an SQS queue"
task :delete, [:identifier] => :environment do
fail 'No SQS queue name or url was specified' if args.identifier.blank?
sqs.delete_queue(queue_url: queue_url(args.identifier), attributes: attrs)
puts "SQS Queue '#{name}' deleted"
end
desc "Delete all SQS queues"
task :delete_all => :environment do
STDOUT.puts "Are sure you want to delete all SQS Queues? Type 'CONFIRM' to confirm:"
input = STDIN.gets.chomp
unless input == 'CONFIRM'
fail "Aborting deletion of SQS Queues. You entered: #{input}"
end
queues.each do |queue|
Rake::Task['queues:delete'].invoke(queue)
end
end
def sqs
@sqs ||= Aws::SQS::Client.new
end
def sns
@sns ||= Aws::SNS::Client.new
end
def queues
sqs.list_queues.inject([]) do |list, page|
list.concat(page.queue_urls)
end
end
def queue_names
queues.map { |queue| queue.rpartition('/').last }
end
def queue_url(identifier)
if %r{^https?://}.match(identifier)
identifier
else
queues.find { |queue| queue.rpartition('/').last == identifier }
end
end
def queue(identifier)
sqs.get_queue_attributes(queue_url: queue_url(identifier), attribute_names: ['All']).attributes
end
def exist_sns_topic?(topic_name)
arn = Shoryuken::SnsArn.new(topic_name).to_s
sns.get_topic_attributes(arn)
rescue Aws::SNS::Errors::NotFound
nil
end
def create_topic_and_queue(identifier)
return false if exist_sns_topic?(identifier)
# 通知先SNSトピック作成
resp = sns.create_topic(
name: identifier # required
)
topic_arn = resp.topic_arn
# 通知先キュー作成
resp = sqs.create_queue(
queue_name: identifier, # required
attributes: {
ReceiveMessageWaitTimeSeconds: '20'
}
)
qurl = resp.queue_url
# トピックへのメッセージを作成した通知先キューへ送るように購読の設定
# キューのARNを取得
resp = sqs.get_queue_attributes(
queue_url: qurl,
attribute_names: %w(QueueArn)
)
queue_arn = resp.attributes['QueueArn']
# 取得したARNを使って購読申込
resp = sns.subscribe(
topic_arn: topic_arn, # required
protocol: 'sqs', # required
endpoint: queue_arn
)
subscription_arn = resp.subscription_arn
# 今回はSNSによるメタ情報は不要なので送信したメッセージをそのままキューに送る
sns.set_subscription_attributes(
subscription_arn: subscription_arn, # required
attribute_name: 'RawMessageDelivery',
attribute_value: 'true'
)
# SNSトピックからSQSキューへのメッセージの追加を許可
# http://docs.aws.amazon.com/ja_jp/sns/latest/dg/SendMessageToSQS.html#SendMessageToSQS.sqs.permissions
policy = {
Version: '2012-10-17',
Statement: [
{
Sid: 'NotificationsToSQS',
Effect: 'Allow',
Principal: '*',
Action: 'sqs:SendMessage',
Resource: queue_arn,
Condition: {
ArnEquals: {
'aws:SourceArn' => topic_arn
}
}
}
]
}
sqs.set_queue_attributes(
queue_url: qurl,
attributes: {
Policy: policy.to_json
}
)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment