Skip to content

Instantly share code, notes, and snippets.

@trekr5
Last active September 28, 2015 14:52
Show Gist options
  • Save trekr5/b6433559e8f892747a8b to your computer and use it in GitHub Desktop.
Save trekr5/b6433559e8f892747a8b to your computer and use it in GitHub Desktop.
s3 plugin
def aws_s3_config
@endpoint_region == 'us-east-1' ? @endpoint_region = 's3.amazonaws.com' : @endpoint_region = 's3-'+@endpoint_region+'.amazonaws.com'
@logger.info("Registering s3 output", :bucket => @bucket, :endpoint_region => @endpoint_region)
AWS.config(
:access_key_id => @access_key_id,
:secret_access_key => @secret_access_key,
:s3_endpoint => @endpoint_region
)
@s3 = AWS::S3.new
end
def time_alert(interval)
Thread.new do
loop do
start_time = Time.now
yield
elapsed = Time.now - start_time
sleep([interval - elapsed, 0].max)
end
end
end
# this method is used for write files on bucket. It accept the file and the name of file.
def write_on_bucket (file_data, file_basename, temp_directory)
# if you lose connection with s3, bad control implementation.
if ( @s3 == nil)
aws_s3_config
end
# find and use the bucket
bucket = @s3.buckets[@bucket]
@logger.debug "S3: ready to write "+file_basename+" in bucket "+@bucket+", Fire in the hole!"
# prepare for write the file
#key = "#{pass_day}/#{file_basename}" can be used to create folders within an s3 bucket
object = bucket.objects[file_basename]
object.write(:file => file_data, :acl => @canned_acl)
@logger.debug "S3: has written successfully "+file_basename+" in bucket "+@bucket + " with canned ACL \"" + @canned_acl + "\""
if File.exists?("#{temp_directory}#{file_basename}")
@logger.debug "File exists #{temp_directory}#{file_basename}"
else
@logger.warn "File #{temp_directory}#{file_basename} does not exist"
#exit
end
end
# this method is used for create new path for name the file
def getFinalPath
@pass_time = Time.now
return @temp_directory+"ls.s3."+Socket.gethostname+"."+(@pass_time).strftime("%Y-%m-%dT%H.%M")
end
# This method is used for restore the previous crash of logstash or to prepare the files to send in bucket.
# Take two parameter: flag and name. Flag indicate if you want to restore or not, name is the name of file
def upFile(flag, name)
# Dir[@temp_directory+name].each do |file|
Dir[@temp_directory+name].each do |file|
name_file = File.basename(file)
@logger.debug "name of file in temporary directory: #{name_file}"
#name_file1 = name_file.gsub!(".txt", ".json")
name_file1 = name_file.gsub!(/\.txt/, ".json")
if (flag == true)
@logger.warn "S3: have found temporary file: "+name_file1+", something has crashed before... Prepare for upload in bucket!"
end
if (!File.zero?(file))
# @pass_day = Time.now.strftime("%Y-%m-%d")
# @temp_directory = next_day_check(@pass_day)
write_on_bucket(file, name_file1, @temp_directory)
if (flag == true)
@logger.debug "S3: file: "+name_file1+" restored on bucket "+@bucket
else
@logger.debug "S3: file: "+name_file1+" was put on bucket "+@bucket
end
end
File.delete(file)
#File.delete(name_file1)
end
end
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
#binding.pry
def newFile (flag)
if (flag == true)
@current_final_path = getFinalPath
#@extension = ".json"
@sizeCounter = 0
end
if (@tags.size != 0)
@tempFile = File.new(@current_final_path+".tag_"+@tag_path+"part"+@sizeCounter.to_s+".txt", "w")
else
#@tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+@extension, "w")
@tempFile = File.new(@current_final_path+".part"+@sizeCounter.to_s+".txt", "w")
end
end
public
def register
require 'aws-sdk'
@pass_day = Time.now.strftime("%Y-%m-%d")
@logger.debug "today is #{@pass_day}..."
current_temp_dir = "/opt/logstash"
Dir.mkdir("#{current_temp_dir}/s3_temp") unless File.exists?("#{current_temp_dir}/s3_temp")
@logger.debug "current temp dir is #{current_temp_dir}...."
@temp_directory = File.join("#{current_temp_dir}", "s3_temp/#{@pass_day}/")
# @today_directory = File.join(@temp_directory, "#{@current_final_path}/")
if (@tags.size != 0)
@tag_path = ""
for i in (0..@tags.size-1)
@tag_path += @tags[i].to_s+"."
end
end
if !(File.directory? @temp_directory)
@logger.debug "S3: Temp Directory "+@temp_directory+" doesn't exist, let's make it!"
Dir.mkdir(@temp_directory)
@logger.debug "today's directory is #{@temp_directory}..."
else
@logger.debug "S3: Temp Directory "+@temp_directory+" exist, nothing to do"
# @logger.debug "S3: Current Directory "+@today_directory+" exist, nothing to do"
end
if (@restore == true )
@logger.debug "S3: is attempting to verify previous crashes..."
upFile(true, "*.txt")
#upFile(true, "*.json")
end
newFile(true)
if (time_file != 0)
first_time = true
@thread = time_alert(@time_file*60) do
if (first_time == false)
@logger.debug "S3: time_file triggered, let's bucket the file if dosen't empty and create new file "
upFile(false, File.basename(@tempFile))
newFile(true)
else
first_time = false
end
end
end
end
public
def receive(event)
return unless output?(event)
# Prepare format of Events
if (@format == "plain")
message = self.class.format_message(event)
elsif (@format == "json")
message = event.to_json
else
message = event.to_s
end
if(time_file !=0)
@logger.debug "S3: trigger files after "+((@pass_time+60*time_file)-Time.now).to_s
end
# if specific the size
if(size_file !=0)
if (@tempFile.size <= @size_file )
@logger.debug "S3: File have size: "+@tempFile.size.to_s+" and size_file is: "+ @size_file.to_s
@logger.debug "S3: put event into: "+File.basename(@tempFile)
# Put the event in the file, now!
File.open(@tempFile, 'a') do |file|
file.puts message
#file.write "\n" #add new lines in event file
end
else
@logger.debug "S3: file: "+File.basename(@tempFile)+" is too large, let's bucket it and create new file"
upFile(false, File.basename(@tempFile))
@sizeCounter += 1
newFile(false)
end
# else we put all in one file
else
@logger.debug "S3: put event into "+File.basename(@tempFile)
File.open(@tempFile, 'a') do |file|
file.puts message
# file.write "\n"
end
end
end
def self.format_message(event)
message = "Date: #{event["@timestamp"]}\n"
message << "Source: #{event["source"]}\n"
message << "Tags: #{event["tags"].join(', ')}\n"
message << "Fields: #{event.to_hash.inspect}\n"
message << "Message: #{event["message"]}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment