Skip to content

Instantly share code, notes, and snippets.

@colinsurprenant
Created April 12, 2017 21:16
Show Gist options
  • Save colinsurprenant/b3ca535857a0e5d4ab48580fdfd653fe to your computer and use it in GitHub Desktop.
Save colinsurprenant/b3ca535857a0e5d4ab48580fdfd653fe to your computer and use it in GitHub Desktop.
require "logstash/util/wrapped_acked_queue"
require "logstash/event"
require "logstash/instrument/namespaced_metric"
RSpec.configure do |config|
config.backtrace_exclusion_patterns << /rakelib\/test/
config.backtrace_exclusion_patterns << /vendor\/bundle/
# config.order = :random
# config.seed = 24307
end
describe LogStash::Util::WrappedAckedQueue do
let(:path) { Stud::Temporary.directory }
context "with multiple writers" do
let(:items) { expected_count / writers }
let(:page_capacity) { 1 << page_capacity_multiplier }
let(:queue_capacity) { page_capacity * queue_capacity_multiplier }
let(:output_strings) { [] }
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }
let(:queue) do
described_class.create_file_based(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, queue_capacity)
end
let(:writer_threads) do
writer = queue.write_client
writers.times.map do |i|
Thread.new(i, items, writer) do |i, items, writer|
publisher(items, writer)
end
end
end
let(:writers_finished) { Concurrent::AtomicBoolean.new(false) }
let(:reader_threads) do
reader = queue.read_client
reader.set_batch_dimensions(batch_size, batch_wait)
reader.set_events_metric(metric.namespace([:stats, :events]))
reader.set_pipeline_metric(metric.namespace([:stats, :pipelines, :main, :events]))
readers.times.map do |i|
Thread.new(i, reader, counts) do |ii, readrrr, countss|
tally = 0
while true
batch = readrrr.read_batch
break if batch.size.zero? && writers_finished.value == true && queue.queue.is_fully_acked?
if simulate_work
sleep rand * 0.01
end
tally += batch.size
batch.close
end
countss[ii] = tally
# puts("reader #{ii}, tally=#{tally}, countss=#{countss.inspect}")
end
end
end
def publisher(items, writer)
items.times.each do |i|
event = LogStash::Event.new("sequence" => "#{i}".ljust(string_size))
writer.push(event)
end
rescue => e
p :publisher_error => e
end
let(:collector) { LogStash::Instrument::Collector.new }
let(:metric) { LogStash::Instrument::Metric.new(collector) }
shared_examples "a well behaved queue" do
it "writes, reads, closes and reopens" do
Thread.abort_on_exception = true
# force lazy initialization to avoid concurency issues within threads
counts
# Start the threads
writer_threads
reader_threads
writer_threads.each(&:join)
writers_finished.make_true
reader_threads.each(&:join)
enqueued = queue.queue.unread_count
if enqueued != 0
output_strings << "unread events in queue: #{enqueued}"
end
got = counts.reduce(&:+)
if got != expected_count
# puts("count=#{counts.inspect}")
output_strings << "events read: #{got}"
end
sleep 0.1
expect { queue.close }.not_to raise_error
sleep 0.1
files = Dir.glob(path + '/*').map{|f| f.sub("#{path}/", '')}
if files.count != 2
output_strings << "File count after close mismatch expected: 2 got: #{files.count}"
output_strings.concat files
end
begin
queue.queue.open
rescue Exception => e
output_strings << e.message
end
queue.close
if output_strings.any?
output_strings << __memoized.reject{|k,v| reject_memo_keys.include?(k)}.inspect
end
expect(output_strings).to eq([])
end
end
let(:writers) { 3 }
let(:readers) { 3 }
let(:simulate_work) { true }
let(:counts) { Concurrent::Array.new([0, 0, 0, 0, 0, 0, 0, 0]) }
let(:page_capacity_multiplier) { 20 }
let(:queue_capacity_multiplier) { 128 }
let(:queue_checkpoint_acks) { 1024 }
let(:queue_checkpoint_writes) { 1024 }
let(:queue_checkpoint_interval) { 1000 }
let(:batch_size) { 500 }
let(:batch_wait) { 1000 }
let(:expected_count) { 60000 }
let(:string_size) { 256 }
describe "with simulate_work ON" do
let(:simulate_work) { true }
context "> more writers than readers <" do
let(:writers) { 4 }
let(:readers) { 2 }
it_behaves_like "a well behaved queue"
end
context "> less writers than readers <" do
let(:writers) { 2 }
let(:readers) { 4 }
it_behaves_like "a well behaved queue"
end
context "> larger checkpoint acks <" do
let(:queue_checkpoint_acks) { 3000 }
it_behaves_like "a well behaved queue"
end
context "> smaller checkpoint acks <" do
let(:queue_checkpoint_acks) { 500 }
it_behaves_like "a well behaved queue"
end
context "> larger checkpoint writes <" do
let(:queue_checkpoint_writes) { 3000 }
it_behaves_like "a well behaved queue"
end
context "> smaller checkpoint writes <" do
let(:queue_checkpoint_writes) { 500 }
it_behaves_like "a well behaved queue"
end
context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }
it_behaves_like "a well behaved queue"
end
context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }
it_behaves_like "a well behaved queue"
end
context "> smaller batch wait <" do
let(:batch_wait) { 125 }
it_behaves_like "a well behaved queue"
end
context "> larger batch wait <" do
let(:batch_wait) { 5000 }
it_behaves_like "a well behaved queue"
end
context "> smaller event size <" do
let(:string_size) { 8 }
it_behaves_like "a well behaved queue"
end
context "> larger event size <" do
let(:string_size) { 8192 }
it_behaves_like "a well behaved queue"
end
context "> small queue size limit <" do
let(:queue_capacity_multiplier) { 10 }
it_behaves_like "a well behaved queue"
end
context "> very large queue size limit <" do
let(:queue_capacity_multiplier) { 512 }
it_behaves_like "a well behaved queue"
end
end
describe "with simulate_work OFF" do
let(:simulate_work) { false }
context "> more writers than readers <" do
let(:writers) { 4 }
let(:readers) { 2 }
it_behaves_like "a well behaved queue"
end
context "> less writers than readers <" do
let(:writers) { 2 }
let(:readers) { 4 }
it_behaves_like "a well behaved queue"
end
context "> larger checkpoint acks <" do
let(:queue_checkpoint_acks) { 3000 }
it_behaves_like "a well behaved queue"
end
context "> smaller checkpoint acks <" do
let(:queue_checkpoint_acks) { 500 }
it_behaves_like "a well behaved queue"
end
context "> larger checkpoint writes <" do
let(:queue_checkpoint_writes) { 3000 }
it_behaves_like "a well behaved queue"
end
context "> smaller checkpoint writes <" do
let(:queue_checkpoint_writes) { 500 }
it_behaves_like "a well behaved queue"
end
context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }
it_behaves_like "a well behaved queue"
end
context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }
it_behaves_like "a well behaved queue"
end
context "> smaller batch wait <" do
let(:batch_wait) { 125 }
it_behaves_like "a well behaved queue"
end
context "> larger batch wait <" do
let(:batch_wait) { 5000 }
it_behaves_like "a well behaved queue"
end
context "> smaller event size <" do
let(:string_size) { 8 }
it_behaves_like "a well behaved queue"
end
context "> larger event size <" do
let(:string_size) { 8192 }
it_behaves_like "a well behaved queue"
end
context "> small queue size limit <" do
let(:queue_capacity_multiplier) { 10 }
it_behaves_like "a well behaved queue"
end
context "> very large queue size limit <" do
let(:queue_capacity_multiplier) { 512 }
it_behaves_like "a well behaved queue"
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment