Skip to content

Instantly share code, notes, and snippets.

@brycied00d
Last active March 15, 2020 02:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brycied00d/70c53104d780835d2d48610b7a783c8c to your computer and use it in GitHub Desktop.
Save brycied00d/70c53104d780835d2d48610b7a783c8c to your computer and use it in GitHub Desktop.
fluentd-buffer-check.rb - Validate each buffered record against some basic principles, identifying "bad"/"corrupt" records within buffer files.
#!/usr/bin/env ruby
# Simple script to "validate" all records in a fluentd buffer
# Sometimes, something goes wonky in my environment and weird data ends up in the
# buffer file, and then causes certain output plugins to choke. Ideally, I can
# catch this earlier as a fluentd filter, but in the meantime I'm left with buffer
# files with lots of good data and bad data and I need a tool to confirm what that
# buffer contains.
# Version 2 of this script will write the good records to a "good file" for reinclusion
# back into fluentd, and a "bad file" recording the unprocessable entities. Additionally,
# it may attempt to fix common malformations I've seen, notably when a record
# inexplicably takes on another record as its child. (I still have no idea how this happens.)
# Record errors and their associated records are printed to stdout; buffer metadata is
# printed to stderr.
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.)
# Copyright 2020 Bryce Chidester <bryce.chidester@calyptix.com>
# What makes a good record?
# 0. Each record should be an Array
# 1. Each record should have 2 nodes
# 2. The first node should be numeric and look like a valid timestamp
# 3. The second node should be a Hash
# 3a. No member of the Hash should be a Hash
# 3b. Each Hash member index is a String
# 3c. Each hash member value is a String, Numeric, or nil.
require 'msgpack'
require 'pp'
require 'time'
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze
def parse_metadata(in_file)
meta_chunk_io = File.open(in_file, 'rb')
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT)
meta_chunk_io.sync = true
meta_chunk_io.binmode
metadata = meta_chunk_io.read
warn "Metadata file size: #{metadata.size}"
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4)
warn "Failed to parse metadata file: #{in_file}"
return nil
end
if metadata.slice(0, 2) == BUFFER_HEADER
size = metadata.slice(2, 4).unpack('N').first
warn "Metadata chunk size: #{size}"
if size
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true)
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read
end
end
nil
end
def bad_record(msg, record)
puts msg
pp record
# TODO: If "rewriting" then this would write the invalid data to the rejects file
end
in_file = ARGV.first
input_chunk_io = File.open(in_file, 'rb')
input_chunk_io.set_encoding(Encoding::ASCII_8BIT)
input_chunk_io.binmode
input_chunk_io.sync = true
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io)
input_chunk_metadata = parse_metadata("#{in_file}.meta")
warn "Metadata timekey: [#{input_chunk_metadata[:timekey]}]"
warn "Metadata tag: [#{input_chunk_metadata[:tag]}]"
warn "Metadata variables: [#{input_chunk_metadata[:variables]}]"
warn "Metadata sequence: [#{input_chunk_metadata[:seq]}]"
warn "Metadata size: [#{input_chunk_metadata[:s]}] (records)"
warn "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]"
warn "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]"
# //n switch means explicit 'ASCII-8BIT' pattern
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a
warn "Extracted state: #{state}"
warn "Extracted chunk_id: #{chunk_id}"
total_count = 0
input_chunk_unpacker.each do |obj|
total_count += 1
unless obj.is_a? Array
bad_record("#{total_count} Record is not an Array", obj)
next
end
unless obj.count == 2
bad_record("#{total_count} Record is wrong size", obj)
next
end
unless obj.first.is_a? Numeric
bad_record("#{total_count} Record timestamp is not numeric", obj)
next
end
unless obj.first > 30 * 365.25 * 86400
bad_record("#{total_count} Record timestamp is unusually old", obj)
next
end
unless obj.first < Time.now.to_i
bad_record("#{total_count} Record timestamp is in the future", obj)
next
end
unless obj.last.is_a? Hash
bad_record("#{total_count} Record data is not a Hash", obj)
next
end
unless obj.last.keys.all? { |k| k.is_a? String }
bad_record("#{total_count} Record data indexes are not all Strings", obj)
next
end
unless obj.last.values.all? { |v| v.is_a?(String) || v.is_a?(Numeric) || v.nil? }
# TODO: Detect if it looks like another record is buried inside and maybe write
# that to a third file for re-analysis and reinclusion?
bad_record("#{total_count} Record data values are not all valid types", obj)
next
end
# TODO: If "rewriting" then I'd write the valid record to the "good file" here.
end
warn "Total records: #{total_count}"
input_chunk_io.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment