Skip to content

Instantly share code, notes, and snippets.

@brycied00d
Last active March 10, 2020 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brycied00d/6c262aa821aca811d7c074b20cf5198c to your computer and use it in GitHub Desktop.
Save brycied00d/6c262aa821aca811d7c074b20cf5198c to your computer and use it in GitHub Desktop.
fluentd-buffer-split.rb - Split a single large fluentd buffer chunk (of type "file") into smaller pieces that can be manually reintroduced gradually in order to reduce load on processing pipelines.
# Quick and dirty script to split a fluentd buffer chunk into smaller chunks.
# Compatible with fluentd 1.9.0. (Much of the buffer/metadata code taken from fluentd source.)
# Copyright 2020 Bryce Chidester <bryce.chidester@calyptix.com>
require 'msgpack'
require 'time'
NEW_CHUNK_SIZE = 100_000
def flush_new_buffer(state, chunk_id, data)
prefix = 'NEW_buffer'
suffix = 'log'
path = "#{prefix}.#{state}#{chunk_id}.#{suffix}"
output_chunk_io = File.open(path, 'wb')
output_chunk_io.set_encoding(Encoding::ASCII_8BIT)
output_chunk_io.binmode
output_chunk_io.sync = true
output_chunk_packer = MessagePack::Packer.new(output_chunk_io)
data.each { |obj| output_chunk_packer.write(obj) }
output_chunk_packer.flush
output_chunk_io.close
path
end
def generate_unique_id
now = Time.now.utc
u1 = ((now.to_i * 1000 * 1000 + now.usec) << 12 | rand(0xfff))
[u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN')
end
def unique_id_to_str(unique_id)
unique_id.unpack('H*').first
end
BUFFER_HEADER = "\xc1\x00".force_encoding(Encoding::ASCII_8BIT).freeze
def parse_metadata(in_file)
meta_chunk_io = File.open(in_file, 'rb')
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT)
meta_chunk_io.sync = true
meta_chunk_io.binmode
metadata = meta_chunk_io.read
puts "Metadata file size: #{metadata.size}"
if metadata.size <= 6 # size of BUFFER_HEADER (2) + size of data size(4)
puts "Failed to parse metadata file: #{in_file}"
return nil
end
if metadata.slice(0, 2) == BUFFER_HEADER
size = metadata.slice(2, 4).unpack('N').first
puts "Metadata chunk size: #{size}"
if size
meta_chunk_unpacker = MessagePack::Unpacker.new(meta_chunk_io, symbolize_keys: true)
return meta_chunk_unpacker.feed(metadata.slice(6, size)).read
end
end
nil
end
def write_metadata(out_file, metadata, unique_id, size)
new_metadata = metadata.merge(id: unique_id, s: size)
# puts "Input metadata: #{metadata}"
# puts "New metadata: #{new_metadata}"
meta_chunk_packer = MessagePack::Packer.new()
bin = meta_chunk_packer.pack(new_metadata).full_pack
meta_chunk_io = File.open(out_file, 'wb')
meta_chunk_io.set_encoding(Encoding::ASCII_8BIT)
meta_chunk_io.binmode
meta_chunk_io.sync = true
meta_chunk_io.write(BUFFER_HEADER + ([bin.bytesize].pack('N')) + bin)
meta_chunk_io.close
end
in_file = ARGV.first
input_chunk_io = File.open(in_file, 'rb')
input_chunk_io.set_encoding(Encoding::ASCII_8BIT)
input_chunk_io.binmode
input_chunk_io.sync = true
input_chunk_unpacker = MessagePack::Unpacker.new(input_chunk_io)
input_chunk_metadata = parse_metadata("#{in_file}.meta")
puts "Metadata timekey: [#{input_chunk_metadata[:timekey]}]"
puts "Metadata tag: [#{input_chunk_metadata[:tag]}]"
puts "Metadata variables: [#{input_chunk_metadata[:variables]}]"
puts "Metadata sequence: [#{input_chunk_metadata[:seq]}]"
puts "Metadata size: [#{input_chunk_metadata[:s]}] (records)"
puts "Metadata created: [#{Time.at(input_chunk_metadata[:c])}]"
puts "Metadata modified: [#{Time.at(input_chunk_metadata[:m])}]"
# //n switch means explicit 'ASCII-8BIT' pattern
_, state, chunk_id = /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n.match(in_file).to_a
puts "Extracted state: #{state}"
puts "Extracted chunk_id: #{chunk_id}"
new_buffer = []
total_count = 0
flushed_count = 0
input_chunk_unpacker.each do |obj|
total_count += 1
new_buffer << obj
next unless new_buffer.size >= NEW_CHUNK_SIZE
new_unique_id = generate_unique_id
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), new_buffer)
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, new_buffer.size)
puts "Writing new chunk: #{path}"
flushed_count += new_buffer.size
new_buffer = []
end
# Perform one last flush
unless new_buffer.size.zero?
new_unique_id = generate_unique_id
path = flush_new_buffer(state, unique_id_to_str(new_unique_id), new_buffer)
write_metadata("#{path}.meta", input_chunk_metadata, new_unique_id, new_buffer.size)
puts "Writing final chunk: #{path}"
flushed_count += new_buffer.size
end
puts "Total records: #{total_count}"
puts "Flushed records: #{flushed_count}"
input_chunk_io.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment