Created
April 10, 2018 16:54
-
-
Save simonwh/b22ea63160b0fd0d6a4cae43072564bf to your computer and use it in GitHub Desktop.
Azureblob
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
require "logstash/inputs/base" | |
require "logstash/namespace" | |
require "azure/storage" | |
require "base64" | |
require "securerandom" | |
# Reads events from Azure Blobs | |
class LogStash::Inputs::Azureblob < LogStash::Inputs::Base | |
# Define the plugin name | |
config_name "azureblob" | |
# Codec | |
# *Possible values available at https://www.elastic.co/guide/en/logstash/current/codec-plugins.html | |
# *Most used: json_lines, line, etc. | |
default :codec, "json_lines" | |
# storage_account_name | |
# *Define the Azure Storage Account Name | |
config :storage_account_name, :validate => :string, :required => true | |
# storage_access_key | |
# *Define the Azure Storage Access Key (available through the portal) | |
config :storage_access_key, :validate => :string, :required => true | |
# container | |
# *Define the container to watch | |
config :container, :validate => :string, :required => true | |
# sleep_time | |
# *Define the sleep_time between scanning for new data | |
config :sleep_time, :validate => :number, :default => 10, :required => false | |
# [New] | |
# path_prefix | |
# *Define the path prefix in the container in order to not take everything | |
config :path_prefix, :validate => :array, :default => [""], :required => false | |
# sincedb | |
# *Define the Azure Storage Table where we can drop information about the the blob we're collecting. | |
# *Important! The sincedb will be on the container we're watching. | |
# *By default, we don't use the sincedb but I recommend it if files gets updated. | |
config :sincedb, :validate => :string, :required => false | |
# ignore_older | |
# When the file input discovers a file that was last modified | |
# before the specified timespan in seconds, the file is ignored. | |
# After it's discovery, if an ignored file is modified it is no | |
# longer ignored and any new data is read. The default is 24 hours. | |
config :ignore_older, :validate => :number, :default => 24 * 60 * 60, :required => false | |
# Choose where Logstash starts initially reading blob: at the beginning or | |
# at the end. The default behavior treats files like live streams and thus | |
# starts at the end. If you have old data you want to import, set this | |
# to 'beginning'. | |
# | |
# This option only modifies "first contact" situations where a file | |
# is new and not seen before, i.e. files that don't have a current | |
# position recorded in a sincedb file read by Logstash. If a file | |
# has already been seen before, this option has no effect and the | |
# position recorded in the sincedb file will be used. | |
config :start_position, :validate => [ "beginning", "end"], :default => "end", :required => false | |
config :endpoint, :validate => :string, :default => "core.windows.net" | |
# The number of entries that the system pulls from Azure as it walk the container. The | |
# Azure default is 5000 but that tends to bog down a system so this allows users to | |
# set the number of files that the system deals with as it walks the container. | |
config :batch_size, :validate => :number, :default => 250, :required => false | |
# Define the container to move/archive files to. The arhive container must be in the same | |
# storage account as the main container. For users that have a large number of files in | |
# the container having to walk them all each iteration can be time consuming. If this options | |
# is specified (with the archive_older option) then once a file is older than the archive_older | |
# value it is moved from the active container to the archive container. This keeps the number of | |
# file in the active continer smaller. | |
config :archive_container, :validate => :string, :required => false | |
# The timespan in seconds to determine if the file should be archived. By default if the file | |
# has not modified in 2 days (2 x ignore_older) then archive it. We are assuming that the file has already been | |
# and that it has been ignored by the ignore_older property. | |
config :archive_older, :validate => :number, :default => 24 * 60 * 60 * 2, :required => false | |
def initialize(*args) | |
super(*args) | |
end # def initialize | |
public | |
def register | |
# Azure.configure do |config| | |
# config.storage_account_name = @storage_account_name | |
# config.storage_access_key = @storage_access_key | |
# config.storage_table_host = "https://#{@storage_account_name}.table.core.windows.net" | |
# config.storage_blob_host = "https://#{@storage_account_name}.blob.#{@endpoint}" | |
# end | |
@azure_storage = Azure::Storage::Client.create(:storage_account_name => @storage_account_name, :storage_access_key => @storage_access_key) | |
@azure_blob = @azure_storage.blob_client | |
if (@sincedb) | |
@azure_table = @azure_storage.table_client | |
init_wad_table | |
end | |
end # def register | |
# Initialize the WAD Table if we have a sincedb defined. | |
def init_wad_table | |
if (@sincedb) | |
begin | |
@azure_table.create_table(@sincedb) # Be sure that the table name is properly named. | |
rescue | |
@logger.info("#{DateTime.now} Table #{@sincedb} already exists.") | |
end | |
end | |
end # def init_wad_table | |
# List the blob names in the container. If we have any path pattern defined, it will filter | |
# the blob names from the list. The status of the blobs will be persisted in the WAD table. | |
# | |
# Returns the list of blob_names to read from. | |
def list_blobs | |
blobs = Hash.new | |
now_time = DateTime.now.new_offset(0) | |
@logger.info("#{DateTime.now} Looking for blobs in #{container} cointainer, #{path_prefix.length} paths (#{path_prefix.to_s})...") | |
path_prefix.each do |prefix| | |
continuation_token = NIL | |
loop do | |
entries = @azure_blob.list_blobs(@container, { :timeout => 10, :max_results => batch_size, :marker => continuation_token, :prefix => prefix}) | |
entries.each do |entry| | |
entry_last_modified = DateTime.parse(entry.properties[:last_modified]) # Normally in GMT 0 | |
elapsed_seconds = ((now_time - entry_last_modified) * 24 * 60 * 60).to_i | |
if (elapsed_seconds <= @ignore_older) | |
@logger.info("#{DateTime.now} Found possibile file within elasped time. #{container} - #{entry.name}") | |
blobs[entry.name] = entry | |
elsif(@archive_container && elapsed_seconds >= @archive_older) | |
archive_blob(entry) | |
end | |
end | |
continuation_token = entries.continuation_token | |
break if continuation_token.nil? || continuation_token.empty? | |
end | |
end | |
@logger.info("#{DateTime.now} Finished looking for blobs in #{container} cointainer. #{blobs.length} are queued for possible candidate with new data") | |
return blobs | |
end # def list_blobs | |
def archive_blob(entry) | |
@logger.info("#{DateTime.now} Moving file from #{container} to archive #{archive_container} - #{entry.name}") | |
@azure_blob.copy_blob(archive_container, entry.name, container, entry.name) | |
@azure_blob.delete_blob(container, entry.name) | |
rescue => e | |
@logger.error("#{DateTime.now} Caught exception while trying to archive the file. #{container} - #{entry.name}", :exception => e) | |
end | |
# Acquire the lock on the blob. Default duration is 60 seconds with a timeout of 10 seconds. | |
# *blob_name: Blob name to threat | |
# Returns true if the aquiring works | |
def acquire_lock(blob_name) | |
@azure_blob.create_page_blob(@container, blob_name, 512) | |
@azure_blob.acquire_lease(@container, blob_name,{:duration=>60, :timeout=>10, :proposed_lease_id=>SecureRandom.uuid}) | |
return true | |
# Shutdown signal for graceful shutdown in LogStash | |
rescue LogStash::ShutdownSignal => e | |
raise e | |
rescue => e | |
@logger.error("#{DateTime.now} Caught exception while locking", :exception => e) | |
return false | |
end # def acquire_lock | |
# Do the official lock on the blob | |
# *blob_names: Array of blob names to threat | |
def lock_blob(blobs) | |
# Take all the blobs without a lock file. | |
real_blobs = blobs.select { |name, v| !name.end_with?(".lock") } | |
# Return the first one not marked as locked + lock it. | |
real_blobs.each do |blob_name, blob| | |
if !blobs.keys.include?(blob_name + ".lock") | |
if acquire_lock(blob_name + ".lock") | |
return blob | |
end | |
end | |
end | |
return NIL | |
end # def lock_blob | |
def list_sinceDbContainerEntities | |
entities = Set.new | |
entities_since = (DateTime.now.new_offset('UTC')-(@ignore_older/60/60/24.0)).strftime('%Y-%m-%dT%H:%M:%SZ') | |
continuation_token = NIL | |
loop do | |
filter = "PartitionKey eq '#{@container}' and Timestamp ge datetime'#{entities_since}'" | |
entries = @azure_table.query_entities(@sincedb, { :filter => filter, :continuation_token => continuation_token}) | |
entries.each do |entry| | |
entities << entry | |
end | |
continuation_token = entries.continuation_token | |
break if continuation_token.nil? || continuation_token.empty? | |
end | |
return entities | |
end # def list_sinceDbContainerEntities | |
def search_since_db_entities(row_key) | |
entries = @azure_table.query_entities(@sincedb, { :filter => "RowKey eq '#{row_key}'" }) | |
return entries.first | |
end | |
def get_since_db_entity(partition_key, row_key) | |
@azure_table.get_entity(@sincedb, partition_key, row_key) | |
end | |
# Process the plugin ans start watching. | |
def process(output_queue) | |
blobs = list_blobs | |
# use the azure table in order to set the :start_range and :end_range | |
# When we do that, we shouldn't use the lock strategy, since we know where we are at. It would still be interesting in a multi-thread | |
# environment. | |
if (@sincedb) | |
existing_entities = list_sinceDbContainerEntities | |
blobs.each do |blob_name, blob_info| | |
blob_name_encoded = Base64.strict_encode64(blob_info.name) | |
#foundEntity = search_since_db_entities(blob_name_encoded) | |
#foundEntity = get_since_db_entity(@container, blob_name_encoded) | |
entityIndex = existing_entities.find_index {|entity| entity.properties["RowKey"] == blob_name_encoded } | |
entity = { | |
"PartitionKey" => @container, | |
"RowKey" => blob_name_encoded, | |
"ByteOffset" => 0, # First contact, start_position is beginning by default | |
"ETag" => NIL, | |
"BlobName" => blob_info.name | |
} | |
if (entityIndex) | |
# exists in table | |
foundEntity = existing_entities.to_a[entityIndex] | |
entity["ByteOffset"] = foundEntity.properties["ByteOffset"] | |
entity["ETag"] = foundEntity.properties["ETag"] | |
@logger.info("#{DateTime.now} Exists in table: #{blob_info.name}") | |
elsif (@start_position === "end") | |
# first contact | |
entity["ByteOffset"] = blob_info.properties[:content_length] | |
@logger.info("#{DateTime.now} First contact: #{blob_info.name} / #{blob_name_encoded}") | |
end | |
if (entity["ETag"] === blob_info.properties[:etag]) | |
# nothing to do... | |
# @logger.info("#{DateTime.now} Blob already up to date #{blob_info.name}") | |
@logger.info("#{DateTime.now} Already up to date (e-tag): #{blob_info.name}") | |
elsif entity['ByteOffset'] == blob_info.properties[:content_length] | |
entity["ETag"] = blob_info.properties[:etag] | |
@azure_table.insert_or_merge_entity(@sincedb, entity) | |
@logger.info("#{DateTime.now} Content length match, update e-tag: #{blob_info.name}") | |
else | |
@logger.info("#{DateTime.now} Processing #{blob_info.name}") | |
blob, content = @azure_blob.get_blob(@container, blob_info.name, { :start_range => entity["ByteOffset"], :end_range => blob_info.properties[:content_length] }) | |
@codec.decode(content) do |event| | |
decorate(event) # we could also add the host name that read the blob in the event from here. | |
# event["host"] = hostname... | |
output_queue << event | |
end | |
# Update the entity with the latest informations we used while processing the blob. If we have a crash, | |
# we will re-process the last batch. | |
entity["ByteOffset"] = blob_info.properties[:content_length] | |
entity["ETag"] = blob_info.properties[:etag] | |
@azure_table.insert_or_merge_entity(@sincedb, entity) | |
end | |
end | |
else | |
# Process the ones not yet processed. (The newly locked blob) | |
blob_info = lock_blob(blobs) | |
# Do what we were doing before | |
return if !blob_info | |
@logger.info("#{DateTime.now} Processing #{blob_info.name}") | |
blob, content = @azure_blob.get_blob(@container, blob_info.name) | |
@codec.decode(content) do |event| | |
decorate(event) # we could also add the host name that read the blob in the event from here. | |
# event["host"] = hostname... | |
output_queue << event | |
end | |
end | |
# Shutdown signal for graceful shutdown in LogStash | |
rescue LogStash::ShutdownSignal => e | |
raise e | |
rescue => e | |
@logger.error("#{DateTime.now} Oh My, An error occurred.", :exception => e) | |
end # def process | |
# Run the plugin (Called directly by LogStash) | |
public | |
def run(output_queue) | |
# Infinite processing loop. | |
while !stop? | |
process(output_queue) | |
sleep sleep_time | |
end # loop | |
end # def run | |
public | |
def teardown | |
# Nothing to do. | |
@logger.info("Teardown") | |
end # def teardown | |
end # class LogStash::Inputs::Azureblob |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment