- Could also try
isolcpus
Average number of messages per batch
cat fastcapa.err | grep "produce messageset with" | awk '{print $7}' | awk '{ s+=$1 } END { print s/NR }'
Average size of each batch
cat fastcapa.err | grep "produce messageset with" | awk '{print $9}' | sed -s "s/[(]//g" | awk '{ s+=$1 } END { printf "%.2f MB \n", (s/NR)/1000000 }'
As of 5/19 @ 8:41 AM EST, even with the following settings with 1 gbps incoming, the batches seem too small.
- Average number of messages per batch: 1472.08
- Average size of each batch: 0.7 MB
# Maximum number of messages allowed on the producer queue.
# Type: integer
# Default: 100000
queue.buffering.max.messages = 5000000
# Maximum time, in milliseconds, for buffering data on the producer queue.
# Type: integer
# Default: 1000
queue.buffering.max.ms = 5000
Increased the max time allowed to buffer.
# Maximum time, in milliseconds, for buffering data on the producer queue.
# Type: integer
# Default: 1000
queue.buffering.max.ms = 20000
Batches are definitely larger. Good news
- Average number of messages per batch: 6501.51
- Average size of each batch: 3.0 MB
Need to increase the local timeout, reduce the number of messages per batch.
# Local message timeout. This value is only enforced locally and limits the time a produced message
# waits for successful delivery. A time of 0 is infinite.
# Type: integer
# Default: 300000
message.timeout.ms = 900000
# How many times to retry sending a failing MessageSet. Note: retrying may cause reordering.
# Type: integer
message.send.max.retries = 5
# Maximum number of messages batched in one MessageSet. The total MessageSet size is
# also limited by message.max.bytes.
# Increase for better compression.
# Type: integer
batch.num.messages = 100000
# Maximum number of messages allowed on the producer queue.
# Type: integer
# Default: 100000
queue.buffering.max.messages = 1000000
# The backoff time in milliseconds before retrying a message send.
# Type: integer
# Default: 100
retry.backoff.ms = 500
# how often statistics are emitted; 0 = never
# Statistics emit interval. The application also needs to register a stats callback
# using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.
# Type: integer
# Default: 0
statistics.interval.ms = 0
[root@y138 ~]# fastcapa -l 8-15,24 --huge-dir /mnt/huge_1GB -- -t pcap_128_dryrun -c /etc/fastcapa/fastcapa.ycluster -b 192 -x 262144 -q 4 -r 4096
EAL: Detected 32 lcore(s)
EAL: Probing VFIO support...
EAL: PCI device 0000:01:00.0 on NUMA socket 0
EAL: probe driver: 8086:1521 net_e1000_igb
EAL: PCI device 0000:01:00.1 on NUMA socket 0
EAL: probe driver: 8086:1521 net_e1000_igb
EAL: PCI device 0000:09:00.0 on NUMA socket 0
EAL: probe driver: 1137:43 net_enic
EAL: PCI device 0000:0a:00.0 on NUMA socket 0
EAL: probe driver: 1137:43 net_enic
EAL: PCI device 0000:81:00.0 on NUMA socket 1
EAL: probe driver: 8086:10fb net_ixgbe
EAL: PCI device 0000:81:00.1 on NUMA socket 1
EAL: probe driver: 8086:10fb net_ixgbe
[ -t KAFKA_TOPIC ] defined as pcap_128_dryrun
[ -c KAFKA_CONFIG ] defined as /etc/fastcapa/fastcapa.ycluster
[ -b BURST_SIZE ] defined as 192
[ -x TX_RING_SIZE ] defined as 262144
[ -q NB_RX_QUEUE ] defined as 4
[ -r NB_RX_DESC ] defined as 4096
[ -p PORT_MASK ] undefined; defaulting to 0x01
USER1: Initializing port 0
USER1: Device setup successfully; port=0, mac=90 e2 ba d9 3c f9
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: Launching receive worker; worker=0, core=9, queue=0
USER1: Receive worker started; core=9, socket=1, queue=0 attempts=0
USER1: Launching receive worker; worker=1, core=10, queue=1
USER1: Launching receive worker; worker=2, core=11, queue=2
USER1: Receive worker started; core=10, socket=1, queue=1 attempts=0
USER1: Receive worker started; core=11, socket=1, queue=2 attempts=0
USER1: Launching receive worker; worker=3, core=12, queue=3
USER1: Launching transmit worker; worker=0, core=13 ring=0
USER1: Receive worker started; core=12, socket=1, queue=3 attempts=0
USER1: Transmit worker started; core=13, socket=1
USER1: Launching transmit worker; worker=1, core=14 ring=1
USER1: Transmit worker started; core=14, socket=1
USER1: Launching transmit worker; worker=2, core=15 ring=2
USER1: Transmit worker started; core=15, socket=1
USER1: Launching transmit worker; worker=3, core=24 ring=3
USER1: Transmit worker started; core=24, socket=1
USER1: Starting to monitor workers; core=8, socket=1
----- in ----- --- queued --- ----- out ----- ---- errs ----
[nic] 590099 0 - 0
[rx] 590152 - 590152 0
[tx] 590153 - 590153 0
[kaf] 590154 0 0 0
...
@ 5/19 11:14 AM EST
----- in ----- --- queued --- ----- out ----- ---- errs ----
[nic] 231579248 0 - 0
[rx] 231579259 - 231550535 28724
[tx] 231507984 - 231506063 0
[kaf] 231506064 2693288 228895194 0
@ 5/19 12:22 PM EST
----- in ----- --- queued --- ----- out ----- ---- errs ----
[nic] 1018958834 0 - 0
[rx] 1018958885 - 1018930161 28724
[tx] 1018898398 - 1018898395 0
[kaf] 1018898399 3116235 1016162974 0
@ 5/19 1:14 PM EST
----- in ----- --- queued --- ----- out ----- ---- errs ----
[nic] 1616686035 0 - 0
[rx] 1616686045 - 1616657321 28724
[tx] 1616628104 - 1616628102 0
[kaf] 1616628102 2578077 1614050129 0
@ 5/19 2:14 PM EST
----- in ----- --- queued --- ----- out ----- ---- errs ----
[nic] 2310130356 0 - 0
[rx] 2310130366 - 2310101642 28724
[tx] 2310075006 - 2310073085 0
[kaf] 2310073085 2661682 2307596041 0
Also validated that the number of messages hitting the topic matches the input rate. For example, this shows roughly 207,000 packets per second.
[root@y136 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list y135:6667 \
--topic pcap_128_dryrun \
--security-protocol PLAINTEXTSASL \
--time -1 | \
grep pcap | \ awk -F: '{p+=$3} END {print p}' && \
sleep 60 && \
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list y135:6667 \
--topic pcap_128_dryrun \
--security-protocol PLAINTEXTSASL \
--time -1 | \
grep pcap | \ awk -F: '{p+=$3} END {print p}' && \
...
15487907908
...
15500336212