Skip to content

Instantly share code, notes, and snippets.

@fred
Last active August 31, 2017 13:27
Show Gist options
  • Save fred/97c26c34366b24b09e2e to your computer and use it in GitHub Desktop.
Save fred/97c26c34366b24b09e2e to your computer and use it in GitHub Desktop.
Script to get SQS messages, download S3 file of cloudtrail and index in Elasticsearch using bulk mode
# Ruby Script to Get messages from SQS containing information of Cloudtrail json.gz file in S3
# everytime a cloudtrail event occurs, AWS will upload the log in json.gz format to S3 and notify in SQS
# we use SQS to get new log events and download from S3, combine all in one json file for bulk importing to Elasticserach
# ready to be used with Kibana
# Run this houly or every 30 minutes.
require 'aws-sdk'
require 'json'
Aws.config.update({
region: 'ap-southeast-1'
})
SQS_URL = "https://sqs.ap-southeast-1.amazonaws.com/xxxxxxxxx/xxxxxxxxx"
SQS_ENDPOINT = "sqs.ap-southeast-1.amazonaws.com"
BASE="AWSLogs/xxxxxxxxxxx/CloudTrail/ap-southeast-1"
def gunzip(data)
sio = StringIO.new(data)
gz = Zlib::GzipReader.new(sio)
read_data = gz.read
gz.close
read_data
end
@s3_client = Aws::S3::Client.new(
region: 'ap-southeast-1',
access_key_id: 'xxxxxxxxxxxx',
secret_access_key: 'xxxxxxxxxxxxxxxxx'
)
@sqs_client = Aws::SQS::Client.new(
region: 'ap-southeast-1',
access_key_id: 'xxxxxxxxxxxxxxxxx',
secret_access_key: 'xxxxxxxxxxxxxxxxx'
)
resp = @sqs_client.receive_message({
queue_url: SQS_URL,
attribute_names: ["Policy", "VisibilityTimeout", "CreatedTimestamp"],
message_attribute_names: ["MessageAttributeName"],
max_number_of_messages: 10, # The maximum number of messages to return, max: 10
visibility_timeout: 60, # The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request
wait_time_seconds: 1,
})
@json = []
@message_handlers = []
resp.messages.each do |message|
message.receipt_handle
body = JSON.parse(message.body)
timestamp = body["Timestamp"]
msg = JSON.parse body["Message"]
bucket = msg["s3Bucket"]
key = msg["s3ObjectKey"][0]
file = @s3_client.get_object(
response_target: '/tmp/json.gz',
bucket: bucket,
key: key
)
data = gunzip(File.read "/tmp/json.gz")
if json_data = JSON.load(data)["Records"]
@json += json_data
@message_handlers << message.receipt_handle
end
end
if @json.empty?
puts "Nothing in SQS"
exit
end
@logstash_date = Time.now.strftime("%Y.%m.%d")
@all = File.open('all', 'w')
@json.each do |json|
date = json["eventTime"]
@all.write %Q{{ "index" : { "_index" : "logstash-#{@logstash_date}", "_type" : "fluentd", "_timestamp" : "#{date}" } }}
@all.write "\n"
@all.write json.to_json
@all.write "\n"
end
@all.close
puts "-------------------"
puts "Processing #{@json.size} requests for #{@logstash_date}"
`curl -s -XPOST localhost:9200/_bulk --data-binary @all; echo`
puts "Deleting #{@message_handlers.size} SQS messages"
@message_handlers.each do |handler|
@sqs_client.delete_message({
queue_url: SQS_URL,
receipt_handle: handler
})
end
puts "Done"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment