Skip to content

Instantly share code, notes, and snippets.

@nickwallen
Last active May 19, 2017 21:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickwallen/1e0c5efe472e7753384ba7e98f4bc677 to your computer and use it in GitHub Desktop.
Save nickwallen/1e0c5efe472e7753384ba7e98f4bc677 to your computer and use it in GitHub Desktop.
  • Could also try isolcpus

Average number of messages per batch

cat fastcapa.err | grep "produce messageset with" | awk '{print $7}' | awk '{ s+=$1 } END { print s/NR }'

Average size of each batch

cat fastcapa.err | grep "produce messageset with" | awk '{print $9}' | sed -s "s/[(]//g" | awk '{ s+=$1 } END { printf "%.2f MB \n", (s/NR)/1000000 }'

As of 5/19 @ 8:41 AM EST, even with the following settings with 1 gbps incoming, the batches seem too small.

  • Average number of messages per batch: 1472.08
  • Average size of each batch: 0.7 MB
 # Maximum number of messages allowed on the producer queue.
 # Type: integer
 # Default: 100000
 queue.buffering.max.messages = 5000000

 # Maximum time, in milliseconds, for buffering data on the producer queue.
 # Type: integer
 # Default: 1000
 queue.buffering.max.ms = 5000

Fix 1

Increased the max time allowed to buffer.

# Maximum time, in milliseconds, for buffering data on the producer queue.
# Type: integer
# Default: 1000
queue.buffering.max.ms = 20000

Batches are definitely larger. Good news

  • Average number of messages per batch: 6501.51
  • Average size of each batch: 3.0 MB

Fix 2

Need to increase the local timeout, reduce the number of messages per batch.

# Local message timeout. This value is only enforced locally and limits the time a produced message
# waits for successful delivery. A time of 0 is infinite.
# Type: integer
# Default: 300000
message.timeout.ms = 900000

# How many times to retry sending a failing MessageSet. Note: retrying may cause reordering.
# Type: integer
message.send.max.retries = 5

# Maximum number of messages batched in one MessageSet. The total MessageSet size is
# also limited by message.max.bytes.
# Increase for better compression.
# Type: integer
batch.num.messages = 100000

# Maximum number of messages allowed on the producer queue.
# Type: integer
# Default: 100000
queue.buffering.max.messages = 1000000

# The backoff time in milliseconds before retrying a message send.
# Type: integer
# Default: 100
retry.backoff.ms = 500

# how often statistics are emitted; 0 = never
# Statistics emit interval. The application also needs to register a stats callback
# using rd_kafka_conf_set_stats_cb(). The granularity is 1000ms. A value of 0 disables statistics.
# Type: integer
# Default: 0
statistics.interval.ms = 0
[root@y138 ~]# fastcapa -l 8-15,24 --huge-dir /mnt/huge_1GB -- -t pcap_128_dryrun -c /etc/fastcapa/fastcapa.ycluster -b 192 -x 262144 -q 4 -r 4096
EAL: Detected 32 lcore(s)
EAL: Probing VFIO support...
EAL: PCI device 0000:01:00.0 on NUMA socket 0
EAL:   probe driver: 8086:1521 net_e1000_igb
EAL: PCI device 0000:01:00.1 on NUMA socket 0
EAL:   probe driver: 8086:1521 net_e1000_igb
EAL: PCI device 0000:09:00.0 on NUMA socket 0
EAL:   probe driver: 1137:43 net_enic
EAL: PCI device 0000:0a:00.0 on NUMA socket 0
EAL:   probe driver: 1137:43 net_enic
EAL: PCI device 0000:81:00.0 on NUMA socket 1
EAL:   probe driver: 8086:10fb net_ixgbe
EAL: PCI device 0000:81:00.1 on NUMA socket 1
EAL:   probe driver: 8086:10fb net_ixgbe
[ -t KAFKA_TOPIC ] defined as pcap_128_dryrun
[ -c KAFKA_CONFIG ] defined as /etc/fastcapa/fastcapa.ycluster
[ -b BURST_SIZE ] defined as 192
[ -x TX_RING_SIZE ] defined as 262144
[ -q NB_RX_QUEUE ] defined as 4
[ -r NB_RX_DESC ] defined as 4096
[ -p PORT_MASK ] undefined; defaulting to 0x01
USER1: Initializing port 0
USER1: Device setup successfully; port=0, mac=90 e2 ba d9 3c f9
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: config[kafka-global]: security.protocol = SASL_PLAINTEXT
USER1: config[kafka-global]: sasl.kerberos.keytab = /etc/security/keytabs/fastcapa.service.keytab
USER1: config[kafka-global]: sasl.kerberos.principal = fastcapa/y138.l42scl.hortonworks.com@EXAMPLE.COM
USER1: config[kafka-global]: metadata.broker.list = y134.l42scl.hortonworks.com:6667,y135.l42scl.hortonworks.com:6667,y136.l42scl.hortonworks.com:6667
USER1: config[kafka-global]: client.id = fastcapa-y138-enp129s0f1
USER1: config[kafka-global]: queue.buffering.max.messages = 5000000
USER1: config[kafka-global]: queue.buffering.max.ms = 20000
USER1: config[kafka-global]: compression.codec = snappy
USER1: config[kafka-global]: batch.num.messages = 100000
USER1: config[kafka-global]: message.max.bytes = 10000000
USER1: config[kafka-global]: message.send.max.retries = 5
USER1: config[kafka-global]: retry.backoff.ms = 500
USER1: config[kafka-global]: statistics.interval.ms = 5000
USER1: config[kafka-global]: socket.timeout.ms = 10000
USER1: config[kafka-global]: delivery.report.only.error = false
USER1: config[kafka-topic]: request.required.acks = 1
USER1: config[kafka-topic]: message.timeout.ms = 900000
USER1: Launching receive worker; worker=0, core=9, queue=0
USER1: Receive worker started; core=9, socket=1, queue=0 attempts=0
USER1: Launching receive worker; worker=1, core=10, queue=1
USER1: Launching receive worker; worker=2, core=11, queue=2
USER1: Receive worker started; core=10, socket=1, queue=1 attempts=0
USER1: Receive worker started; core=11, socket=1, queue=2 attempts=0
USER1: Launching receive worker; worker=3, core=12, queue=3
USER1: Launching transmit worker; worker=0, core=13 ring=0
USER1: Receive worker started; core=12, socket=1, queue=3 attempts=0
USER1: Transmit worker started; core=13, socket=1
USER1: Launching transmit worker; worker=1, core=14 ring=1
USER1: Transmit worker started; core=14, socket=1
USER1: Launching transmit worker; worker=2, core=15 ring=2
USER1: Transmit worker started; core=15, socket=1
USER1: Launching transmit worker; worker=3, core=24 ring=3
USER1: Transmit worker started; core=24, socket=1
USER1: Starting to monitor workers; core=8, socket=1


      ----- in -----  --- queued --- ----- out ----- ---- errs  ----
[nic]          590099               0               -               0
[rx]           590152               -          590152               0
[tx]           590153               -          590153               0
[kaf]          590154               0               0               0

...

@ 5/19 11:14 AM EST
      ----- in -----  --- queued --- ----- out ----- ---- errs  ----
[nic]       231579248               0               -               0
[rx]        231579259               -       231550535           28724
[tx]        231507984               -       231506063               0
[kaf]       231506064         2693288       228895194               0

@ 5/19 12:22 PM EST
      ----- in -----  --- queued --- ----- out ----- ---- errs  ----
[nic]      1018958834               0               -               0
[rx]       1018958885               -      1018930161           28724
[tx]       1018898398               -      1018898395               0
[kaf]      1018898399         3116235      1016162974               0

@ 5/19 1:14 PM EST
      ----- in -----  --- queued --- ----- out ----- ---- errs  ----
[nic]      1616686035               0               -               0
[rx]       1616686045               -      1616657321           28724
[tx]       1616628104               -      1616628102               0
[kaf]      1616628102         2578077      1614050129               0

@ 5/19 2:14 PM EST
      ----- in -----  --- queued --- ----- out ----- ---- errs  ----
[nic]      2310130356               0               -               0
[rx]       2310130366               -      2310101642           28724
[tx]       2310075006               -      2310073085               0
[kaf]      2310073085         2661682      2307596041               0

Also validated that the number of messages hitting the topic matches the input rate. For example, this shows roughly 207,000 packets per second.

[root@y136 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list y135:6667 \    
  --topic pcap_128_dryrun \
  --security-protocol PLAINTEXTSASL \
  --time -1 | \
  grep pcap | \ awk -F: '{p+=$3} END {print p}' && \
  sleep 60 && \
  kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list y135:6667 \    
  --topic pcap_128_dryrun \
  --security-protocol PLAINTEXTSASL \
  --time -1 | \
  grep pcap | \ awk -F: '{p+=$3} END {print p}' && \
...
15487907908
...
15500336212
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment