Skip to content

Instantly share code, notes, and snippets.

@simonwh
Created April 10, 2018 16:54
Show Gist options
  • Save simonwh/b22ea63160b0fd0d6a4cae43072564bf to your computer and use it in GitHub Desktop.
Save simonwh/b22ea63160b0fd0d6a4cae43072564bf to your computer and use it in GitHub Desktop.
Azureblob
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "azure/storage"
require "base64"
require "securerandom"
# Reads events from Azure Blobs
class LogStash::Inputs::Azureblob < LogStash::Inputs::Base
# Define the plugin name
config_name "azureblob"
# Codec
# *Possible values available at https://www.elastic.co/guide/en/logstash/current/codec-plugins.html
# *Most used: json_lines, line, etc.
default :codec, "json_lines"
# storage_account_name
# *Define the Azure Storage Account Name
config :storage_account_name, :validate => :string, :required => true
# storage_access_key
# *Define the Azure Storage Access Key (available through the portal)
config :storage_access_key, :validate => :string, :required => true
# container
# *Define the container to watch
config :container, :validate => :string, :required => true
# sleep_time
# *Define the sleep_time between scanning for new data
config :sleep_time, :validate => :number, :default => 10, :required => false
# [New]
# path_prefix
# *Define the path prefix in the container in order to not take everything
config :path_prefix, :validate => :array, :default => [""], :required => false
# sincedb
# *Define the Azure Storage Table where we can drop information about the the blob we're collecting.
# *Important! The sincedb will be on the container we're watching.
# *By default, we don't use the sincedb but I recommend it if files gets updated.
config :sincedb, :validate => :string, :required => false
# ignore_older
# When the file input discovers a file that was last modified
# before the specified timespan in seconds, the file is ignored.
# After it's discovery, if an ignored file is modified it is no
# longer ignored and any new data is read. The default is 24 hours.
config :ignore_older, :validate => :number, :default => 24 * 60 * 60, :required => false
# Choose where Logstash starts initially reading blob: at the beginning or
# at the end. The default behavior treats files like live streams and thus
# starts at the end. If you have old data you want to import, set this
# to 'beginning'.
#
# This option only modifies "first contact" situations where a file
# is new and not seen before, i.e. files that don't have a current
# position recorded in a sincedb file read by Logstash. If a file
# has already been seen before, this option has no effect and the
# position recorded in the sincedb file will be used.
config :start_position, :validate => [ "beginning", "end"], :default => "end", :required => false
config :endpoint, :validate => :string, :default => "core.windows.net"
# The number of entries that the system pulls from Azure as it walk the container. The
# Azure default is 5000 but that tends to bog down a system so this allows users to
# set the number of files that the system deals with as it walks the container.
config :batch_size, :validate => :number, :default => 250, :required => false
# Define the container to move/archive files to. The arhive container must be in the same
# storage account as the main container. For users that have a large number of files in
# the container having to walk them all each iteration can be time consuming. If this options
# is specified (with the archive_older option) then once a file is older than the archive_older
# value it is moved from the active container to the archive container. This keeps the number of
# file in the active continer smaller.
config :archive_container, :validate => :string, :required => false
# The timespan in seconds to determine if the file should be archived. By default if the file
# has not modified in 2 days (2 x ignore_older) then archive it. We are assuming that the file has already been
# and that it has been ignored by the ignore_older property.
config :archive_older, :validate => :number, :default => 24 * 60 * 60 * 2, :required => false
def initialize(*args)
super(*args)
end # def initialize
public
def register
# Azure.configure do |config|
# config.storage_account_name = @storage_account_name
# config.storage_access_key = @storage_access_key
# config.storage_table_host = "https://#{@storage_account_name}.table.core.windows.net"
# config.storage_blob_host = "https://#{@storage_account_name}.blob.#{@endpoint}"
# end
@azure_storage = Azure::Storage::Client.create(:storage_account_name => @storage_account_name, :storage_access_key => @storage_access_key)
@azure_blob = @azure_storage.blob_client
if (@sincedb)
@azure_table = @azure_storage.table_client
init_wad_table
end
end # def register
# Initialize the WAD Table if we have a sincedb defined.
def init_wad_table
if (@sincedb)
begin
@azure_table.create_table(@sincedb) # Be sure that the table name is properly named.
rescue
@logger.info("#{DateTime.now} Table #{@sincedb} already exists.")
end
end
end # def init_wad_table
# List the blob names in the container. If we have any path pattern defined, it will filter
# the blob names from the list. The status of the blobs will be persisted in the WAD table.
#
# Returns the list of blob_names to read from.
def list_blobs
blobs = Hash.new
now_time = DateTime.now.new_offset(0)
@logger.info("#{DateTime.now} Looking for blobs in #{container} cointainer, #{path_prefix.length} paths (#{path_prefix.to_s})...")
path_prefix.each do |prefix|
continuation_token = NIL
loop do
entries = @azure_blob.list_blobs(@container, { :timeout => 10, :max_results => batch_size, :marker => continuation_token, :prefix => prefix})
entries.each do |entry|
entry_last_modified = DateTime.parse(entry.properties[:last_modified]) # Normally in GMT 0
elapsed_seconds = ((now_time - entry_last_modified) * 24 * 60 * 60).to_i
if (elapsed_seconds <= @ignore_older)
@logger.info("#{DateTime.now} Found possibile file within elasped time. #{container} - #{entry.name}")
blobs[entry.name] = entry
elsif(@archive_container && elapsed_seconds >= @archive_older)
archive_blob(entry)
end
end
continuation_token = entries.continuation_token
break if continuation_token.nil? || continuation_token.empty?
end
end
@logger.info("#{DateTime.now} Finished looking for blobs in #{container} cointainer. #{blobs.length} are queued for possible candidate with new data")
return blobs
end # def list_blobs
def archive_blob(entry)
@logger.info("#{DateTime.now} Moving file from #{container} to archive #{archive_container} - #{entry.name}")
@azure_blob.copy_blob(archive_container, entry.name, container, entry.name)
@azure_blob.delete_blob(container, entry.name)
rescue => e
@logger.error("#{DateTime.now} Caught exception while trying to archive the file. #{container} - #{entry.name}", :exception => e)
end
# Acquire the lock on the blob. Default duration is 60 seconds with a timeout of 10 seconds.
# *blob_name: Blob name to threat
# Returns true if the aquiring works
def acquire_lock(blob_name)
@azure_blob.create_page_blob(@container, blob_name, 512)
@azure_blob.acquire_lease(@container, blob_name,{:duration=>60, :timeout=>10, :proposed_lease_id=>SecureRandom.uuid})
return true
# Shutdown signal for graceful shutdown in LogStash
rescue LogStash::ShutdownSignal => e
raise e
rescue => e
@logger.error("#{DateTime.now} Caught exception while locking", :exception => e)
return false
end # def acquire_lock
# Do the official lock on the blob
# *blob_names: Array of blob names to threat
def lock_blob(blobs)
# Take all the blobs without a lock file.
real_blobs = blobs.select { |name, v| !name.end_with?(".lock") }
# Return the first one not marked as locked + lock it.
real_blobs.each do |blob_name, blob|
if !blobs.keys.include?(blob_name + ".lock")
if acquire_lock(blob_name + ".lock")
return blob
end
end
end
return NIL
end # def lock_blob
def list_sinceDbContainerEntities
entities = Set.new
entities_since = (DateTime.now.new_offset('UTC')-(@ignore_older/60/60/24.0)).strftime('%Y-%m-%dT%H:%M:%SZ')
continuation_token = NIL
loop do
filter = "PartitionKey eq '#{@container}' and Timestamp ge datetime'#{entities_since}'"
entries = @azure_table.query_entities(@sincedb, { :filter => filter, :continuation_token => continuation_token})
entries.each do |entry|
entities << entry
end
continuation_token = entries.continuation_token
break if continuation_token.nil? || continuation_token.empty?
end
return entities
end # def list_sinceDbContainerEntities
def search_since_db_entities(row_key)
entries = @azure_table.query_entities(@sincedb, { :filter => "RowKey eq '#{row_key}'" })
return entries.first
end
def get_since_db_entity(partition_key, row_key)
@azure_table.get_entity(@sincedb, partition_key, row_key)
end
# Process the plugin ans start watching.
def process(output_queue)
blobs = list_blobs
# use the azure table in order to set the :start_range and :end_range
# When we do that, we shouldn't use the lock strategy, since we know where we are at. It would still be interesting in a multi-thread
# environment.
if (@sincedb)
existing_entities = list_sinceDbContainerEntities
blobs.each do |blob_name, blob_info|
blob_name_encoded = Base64.strict_encode64(blob_info.name)
#foundEntity = search_since_db_entities(blob_name_encoded)
#foundEntity = get_since_db_entity(@container, blob_name_encoded)
entityIndex = existing_entities.find_index {|entity| entity.properties["RowKey"] == blob_name_encoded }
entity = {
"PartitionKey" => @container,
"RowKey" => blob_name_encoded,
"ByteOffset" => 0, # First contact, start_position is beginning by default
"ETag" => NIL,
"BlobName" => blob_info.name
}
if (entityIndex)
# exists in table
foundEntity = existing_entities.to_a[entityIndex]
entity["ByteOffset"] = foundEntity.properties["ByteOffset"]
entity["ETag"] = foundEntity.properties["ETag"]
@logger.info("#{DateTime.now} Exists in table: #{blob_info.name}")
elsif (@start_position === "end")
# first contact
entity["ByteOffset"] = blob_info.properties[:content_length]
@logger.info("#{DateTime.now} First contact: #{blob_info.name} / #{blob_name_encoded}")
end
if (entity["ETag"] === blob_info.properties[:etag])
# nothing to do...
# @logger.info("#{DateTime.now} Blob already up to date #{blob_info.name}")
@logger.info("#{DateTime.now} Already up to date (e-tag): #{blob_info.name}")
elsif entity['ByteOffset'] == blob_info.properties[:content_length]
entity["ETag"] = blob_info.properties[:etag]
@azure_table.insert_or_merge_entity(@sincedb, entity)
@logger.info("#{DateTime.now} Content length match, update e-tag: #{blob_info.name}")
else
@logger.info("#{DateTime.now} Processing #{blob_info.name}")
blob, content = @azure_blob.get_blob(@container, blob_info.name, { :start_range => entity["ByteOffset"], :end_range => blob_info.properties[:content_length] })
@codec.decode(content) do |event|
decorate(event) # we could also add the host name that read the blob in the event from here.
# event["host"] = hostname...
output_queue << event
end
# Update the entity with the latest informations we used while processing the blob. If we have a crash,
# we will re-process the last batch.
entity["ByteOffset"] = blob_info.properties[:content_length]
entity["ETag"] = blob_info.properties[:etag]
@azure_table.insert_or_merge_entity(@sincedb, entity)
end
end
else
# Process the ones not yet processed. (The newly locked blob)
blob_info = lock_blob(blobs)
# Do what we were doing before
return if !blob_info
@logger.info("#{DateTime.now} Processing #{blob_info.name}")
blob, content = @azure_blob.get_blob(@container, blob_info.name)
@codec.decode(content) do |event|
decorate(event) # we could also add the host name that read the blob in the event from here.
# event["host"] = hostname...
output_queue << event
end
end
# Shutdown signal for graceful shutdown in LogStash
rescue LogStash::ShutdownSignal => e
raise e
rescue => e
@logger.error("#{DateTime.now} Oh My, An error occurred.", :exception => e)
end # def process
# Run the plugin (Called directly by LogStash)
public
def run(output_queue)
# Infinite processing loop.
while !stop?
process(output_queue)
sleep sleep_time
end # loop
end # def run
public
def teardown
# Nothing to do.
@logger.info("Teardown")
end # def teardown
end # class LogStash::Inputs::Azureblob
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment