Created
June 19, 2015 20:08
-
-
Save tomwans/23e780f870a155f739ed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'sensu/transport/base' | |
require 'aws-sdk' | |
module Sensu | |
module Transport | |
class SNSSQS < Sensu::Transport::Base | |
attr_accessor :logger | |
def initialize | |
@connected = false | |
@subscribing = false | |
end | |
def connected?; @connected; end | |
def connect(settings) | |
@settings = settings | |
@connected = true | |
@sqs = Aws::SQS::Client.new(region: @settings[:region]) | |
@sns = Aws::SNS::Client.new(region: @settings[:region]) | |
end | |
# subscribe will begin "subscribing" to the consuming sqs queue. | |
# | |
# We assume that the SQS Queue is consuming "Raw" messages from | |
# SNS. | |
# | |
# "subscribing" means that the "callback" parameter will be | |
# called when there is a message for you to consume. | |
# | |
# The pipe, funnel, and type parameters are completely ignored, | |
# as SQS has no mechanism to filter messages. | |
def subscribe(type, pipe, funnel = nil, options = {}, &callback) | |
self.logger.info("subscribing to type=#{type}, pipe=#{pipe}, funnel=#{funnel}") | |
unless @subscribing | |
do_all_the_time { | |
receive_messages.map do |msg| | |
EM.defer { callback.call(msg, msg.body) } if callback | |
end | |
} | |
@subscribing = true | |
end | |
end | |
def acknowledge(info, &callback) | |
EM.defer { | |
# we just consume every msg | |
@sqs.delete_message( | |
queue_url: @settings[:consuming_sqs_queue_url], | |
receipt_handle: info.receipt_handle, | |
) | |
callback.call(info) if callback | |
} | |
end | |
# publish publishes a message to an SNS topic. | |
# | |
# The type, pipe, and options are transformed into SNS message | |
# attributes and included with the message. | |
def publish(type, pipe, message, options = {}, &callback) | |
attributes = {} | |
attributes["type"] = str_attr(type) | |
attributes["pipe"] = str_attr(pipe) | |
options.each do |k, v| | |
attributes[k.to_s] = str_attr(v.to_s) | |
end | |
EM.defer { send_message(message, attributes, &callback) } | |
end | |
private | |
def str_attr(str) | |
{ :data_type => "String", :string_value => str } | |
end | |
def do_all_the_time(&blk) | |
EM.defer { blk.call(); do_all_the_time(&blk) } | |
end | |
def send_message(msg, attributes, &callback) | |
resp = @sns.publish( | |
target_arn: @settings[:publishing_sns_topic_arn], | |
message: msg, | |
message_attributes: attributes | |
) | |
callback.call({ :response => resp }) if callback | |
end | |
# receive_messages returns an array of SQS messages | |
# for the consuming queue | |
def receive_messages | |
begin | |
resp = @sqs.receive_message( | |
queue_url: @settings[:consuming_sqs_queue_url], | |
wait_time_seconds: @settings[:wait_time_seconds], | |
max_number_of_messages: @settings[:max_number_of_messages], | |
) | |
resp.messages | |
rescue Aws::SQS::Errors::ServiceError => e | |
self.logger.info(e) | |
end | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment