Skip to content

Instantly share code, notes, and snippets.

@tomwans
Created June 19, 2015 20:08
Show Gist options
  • Save tomwans/23e780f870a155f739ed to your computer and use it in GitHub Desktop.
Save tomwans/23e780f870a155f739ed to your computer and use it in GitHub Desktop.
require 'sensu/transport/base'
require 'aws-sdk'
module Sensu
module Transport
class SNSSQS < Sensu::Transport::Base
attr_accessor :logger
def initialize
@connected = false
@subscribing = false
end
def connected?; @connected; end
def connect(settings)
@settings = settings
@connected = true
@sqs = Aws::SQS::Client.new(region: @settings[:region])
@sns = Aws::SNS::Client.new(region: @settings[:region])
end
# subscribe will begin "subscribing" to the consuming sqs queue.
#
# We assume that the SQS Queue is consuming "Raw" messages from
# SNS.
#
# "subscribing" means that the "callback" parameter will be
# called when there is a message for you to consume.
#
# The pipe, funnel, and type parameters are completely ignored,
# as SQS has no mechanism to filter messages.
def subscribe(type, pipe, funnel = nil, options = {}, &callback)
self.logger.info("subscribing to type=#{type}, pipe=#{pipe}, funnel=#{funnel}")
unless @subscribing
do_all_the_time {
receive_messages.map do |msg|
EM.defer { callback.call(msg, msg.body) } if callback
end
}
@subscribing = true
end
end
def acknowledge(info, &callback)
EM.defer {
# we just consume every msg
@sqs.delete_message(
queue_url: @settings[:consuming_sqs_queue_url],
receipt_handle: info.receipt_handle,
)
callback.call(info) if callback
}
end
# publish publishes a message to an SNS topic.
#
# The type, pipe, and options are transformed into SNS message
# attributes and included with the message.
def publish(type, pipe, message, options = {}, &callback)
attributes = {}
attributes["type"] = str_attr(type)
attributes["pipe"] = str_attr(pipe)
options.each do |k, v|
attributes[k.to_s] = str_attr(v.to_s)
end
EM.defer { send_message(message, attributes, &callback) }
end
private
def str_attr(str)
{ :data_type => "String", :string_value => str }
end
def do_all_the_time(&blk)
EM.defer { blk.call(); do_all_the_time(&blk) }
end
def send_message(msg, attributes, &callback)
resp = @sns.publish(
target_arn: @settings[:publishing_sns_topic_arn],
message: msg,
message_attributes: attributes
)
callback.call({ :response => resp }) if callback
end
# receive_messages returns an array of SQS messages
# for the consuming queue
def receive_messages
begin
resp = @sqs.receive_message(
queue_url: @settings[:consuming_sqs_queue_url],
wait_time_seconds: @settings[:wait_time_seconds],
max_number_of_messages: @settings[:max_number_of_messages],
)
resp.messages
rescue Aws::SQS::Errors::ServiceError => e
self.logger.info(e)
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment