Skip to content

Instantly share code, notes, and snippets.

@jsvd
Created July 28, 2023 13:57
Show Gist options
  • Save jsvd/a047320ff68bbe064e93dec0d6a251f7 to your computer and use it in GitHub Desktop.
Save jsvd/a047320ff68bbe064e93dec0d6a251f7 to your computer and use it in GitHub Desktop.
# encoding: utf-8
# JRUBY_OPTS="-J-Xmx4g -J-Xms4g" ruby beats_writer_ssl.rb
require "socket"
require "thread"
require "zlib"
require "json"
require "openssl"
Thread.abort_on_exception = true
HOST="127.0.0.1"
PORT=3333
CLIENT_CERT="/Users/joaoduarte/elastic/certificates/client_from_root.crt"
CLIENT_KEY="/Users/joaoduarte/elastic/certificates/client_from_root.key.pkcs8"
module Lumberjack
SEQUENCE_MAX = (2**32-1).freeze
class Client
def initialize
@sequence = 0
@socket = connect
end
private
def connect
socket = TCPSocket.new(HOST, PORT)
ctx = OpenSSL::SSL::SSLContext.new
ctx.cert = OpenSSL::X509::Certificate.new(File.read(CLIENT_CERT))
ctx.key = OpenSSL::PKey::RSA.new(File.read(CLIENT_KEY))
ctx.ssl_version = :TLSv1_2
# Wrap the socket with SSL/TLS
ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ctx)
ssl_socket.sync_close = true
ssl_socket.connect
ssl_socket
end
public
def write(elements, chunk_size=5000, sleep_seconds=0.01)
elements = [elements] if elements.is_a?(Hash)
send_window_size(elements.size)
payload = elements.map { |element| JsonEncoder.to_frame(element, inc) }.join
send_payload(payload, chunk_size, sleep_seconds)
end
private
def inc
@sequence = 0 if @sequence + 1 > Lumberjack::SEQUENCE_MAX
@sequence = @sequence + 1
end
private
def send_window_size(size)
@socket.syswrite(["2", "W", size].pack("AAN"))
end
private
def send_payload(payload, chunk_size, sleep_seconds)
payload_size = payload.size
written = 0
while written < payload_size
written += @socket.syswrite(payload[written..written+chunk_size])
puts "written #{written}.."
sleep sleep_seconds
end
end
public
def close
@socket.close
end
end
module JsonEncoder
def self.to_frame(hash, sequence)
json = hash.to_json
json_length = json.bytesize
pack = "AANNA#{json_length}"
frame = ["2", "J", sequence, json_length, json]
frame.pack(pack)
end
end
end
client_count = 1500
message = 'a'*8*16*1024
#require 'pry'
#binding.pry
puts "Connecting #{client_count} clients"
clients = client_count.times.map { Lumberjack::Client.new }
puts "Writing approximately #{(client_count*message.size)/1024.0/1024.0}Mib across #{client_count} clients"
threads = client_count.times.map do |i|
Thread.new(i) do |i|
client = clients[i]
# keep message size above 16k, requiring two TLS records
data = [ { "message" => message } ]
50.times do
client.write(data)
sleep 1*rand
end
client.close
end
end
threads.each(&:join)
puts "Done"
sleep 10
/tmp/logstash-8.9.0
❯ LS_JAVA_OPTS="-Dio.netty.allocator.numHeapArenas=0 -XX:NativeMemoryTracking=summary -Dio.netty.allocator.numDirectArenas=1 -XX:MaxDirectMemorySize=128m" bin/logstash -e "
input {
beats {
port => 3333
enrich => none
ssl => true
ssl_certificate => '/Users/joaoduarte/elastic/certificates/client_from_root.crt'
ssl_key => '/Users/joaoduarte/elastic/certificates/client_from_root.key.pkcs8'
ssl_certificate_authorities => '/Users/joaoduarte/elastic/certificates/root.crt'
ssl_client_authentication => 'required'
}
}
filter {
ruby {
init => \"Thread.new { loop { puts Java::io.netty.buffer.ByteBufAllocator::DEFAULT.metric.toString(); sleep 5 } } \"
code => \"\"
}
}
output { null { } }" -w 1 -b 1
Using bundled JDK: /tmp/logstash-8.9.0/jdk.app/Contents/Home
Sending Logstash logs to /tmp/logstash-8.9.0/logs which is now configured via log4j2.properties
[2023-07-28T14:55:53,810][INFO ][logstash.runner ] Log4j configuration path used is: /tmp/logstash-8.9.0/config/log4j2.properties
[2023-07-28T14:55:53,814][WARN ][logstash.runner ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2023-07-28T14:55:53,814][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"8.9.0", "jruby.version"=>"jruby 9.3.10.0 (2.6.8) 2023-02-01 107b2e6697 OpenJDK 64-Bit Server VM 17.0.7+7 on 17.0.7+7 +indy +jit [arm64-darwin]"}
[2023-07-28T14:55:53,815][INFO ][logstash.runner ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Dio.netty.allocator.numHeapArenas=0, -XX:NativeMemoryTracking=summary, -Dio.netty.allocator.numDirectArenas=1, -XX:MaxDirectMemorySize=128m, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[2023-07-28T14:55:53,831][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
/private/tmp/logstash-8.9.0/vendor/bundle/jruby/2.6.0/gems/sinatra-2.2.4/lib/sinatra/base.rb:938: warning: constant Tilt::Cache is deprecated
[2023-07-28T14:55:54,067][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2023-07-28T14:55:54,186][INFO ][org.reflections.Reflections] Reflections took 44 ms to scan 1 urls, producing 132 keys and 464 values
[2023-07-28T14:55:54,277][WARN ][logstash.inputs.beats ] You are using a deprecated config setting "ssl" set in beats. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Use 'ssl_enabled' instead. If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"ssl", :plugin=><LogStash::Inputs::Beats ssl_certificate=>"/Users/joaoduarte/elastic/certificates/client_from_root.crt", ssl_client_authentication=>"required", ssl_key=>"/Users/joaoduarte/elastic/certificates/client_from_root.key.pkcs8", port=>3333, enrich=>["none"], id=>"812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb", ssl=>true, ssl_certificate_authorities=>["/Users/joaoduarte/elastic/certificates/root.crt"], enable_metric=>true, debug=>false, codec=><LogStash::Codecs::Plain id=>"plain_a39ca8af-f3a5-467d-a33b-7392b8dad781", enable_metric=>true, charset=>"UTF-8">, host=>"0.0.0.0", ssl_enabled=>false, ssl_verify_mode=>"none", ssl_peer_metadata=>false, include_codec_tag=>true, ssl_handshake_timeout=>10000, ssl_cipher_suites=>["TLS_AES_256_GCM_SHA384", "TLS_AES_128_GCM_SHA256", "TLS_CHACHA20_POLY1305_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256", "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256"], ssl_supported_protocols=>["TLSv1.2", "TLSv1.3"], client_inactivity_timeout=>60, executor_threads=>10, add_hostname=>false, tls_min_version=>1, tls_max_version=>1.3>}
[2023-07-28T14:55:54,321][INFO ][logstash.javapipeline ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2023-07-28T14:55:54,328][WARN ][logstash.javapipeline ][main] 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 0; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 0; chunkSize: 4194304)
[2023-07-28T14:55:54,361][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>1, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x262d8cc@/private/tmp/logstash-8.9.0/logstash-core/lib/logstash/java_pipeline.rb:134 run>"}
[2023-07-28T14:55:54,578][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.22}
[2023-07-28T14:55:54,585][INFO ][logstash.inputs.beats ][main] Starting input listener {:address=>"0.0.0.0:3333"}
[2023-07-28T14:55:54,859][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
[2023-07-28T14:55:54,866][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2023-07-28T14:55:54,879][INFO ][org.logstash.beats.Server][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Starting server on port: 3333
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304)
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304)
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304)
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304)
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304)
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 130023424; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304)
[2023-07-28T14:56:27,882][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 106825184, ratio: 82
[2023-07-28T14:56:27,886][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 106976992, ratio: 82
[2023-07-28T14:56:27,884][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0xd4882a85, L:/127.0.0.1:3333 - R:/127.0.0.1:55498] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:27,944][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x36b863a5, L:/127.0.0.1:3333 - R:/127.0.0.1:55411] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:28,477][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 107220064, ratio: 82
[2023-07-28T14:56:28,479][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x03117f6f, L:/127.0.0.1:3333 - R:/127.0.0.1:55880] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:29,069][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 107207776, ratio: 82
[2023-07-28T14:56:29,115][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x13a4bfeb, L:/127.0.0.1:3333 - R:/127.0.0.1:55419] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:32,060][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 107142240, ratio: 82
[2023-07-28T14:56:32,061][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x14d57f24, L:/127.0.0.1:3333 - R:/127.0.0.1:56154] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:34,493][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109245488, ratio: 84
[2023-07-28T14:56:34,494][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x36b863a5, L:/127.0.0.1:3333 - R:/127.0.0.1:55411] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:35,112][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109215792, ratio: 84
[2023-07-28T14:56:35,114][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x7ef85f80, L:/127.0.0.1:3333 - R:/127.0.0.1:55612] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:39,345][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109069360, ratio: 84
[2023-07-28T14:56:39,390][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x0afee5ea, L:/127.0.0.1:3333 - R:/127.0.0.1:55462] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
[2023-07-28T14:56:40,526][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109685392, ratio: 84
[2023-07-28T14:56:40,528][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0xc3888ed5, L:/127.0.0.1:3333 - R:/127.0.0.1:55670] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash
^C[2023-07-28T14:56:40,698][WARN ][logstash.runner ] SIGINT received. Shutting down.
^C[2023-07-28T14:56:40,890][FATAL][logstash.runner ] SIGINT received. Terminating immediately..
[2023-07-28T14:56:40,912][FATAL][org.logstash.Logstash ]
org.jruby.exceptions.ThreadKill: null
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment