Skip to content

Instantly share code, notes, and snippets.

@RaulGracia
Last active September 16, 2020 08:46
Show Gist options
  • Save RaulGracia/5e6c06a09fd9f5be26f2b64f1c92b984 to your computer and use it in GitHub Desktop.
Save RaulGracia/5e6c06a09fd9f5be26f2b64f1c92b984 to your computer and use it in GitHub Desktop.
From 530c0ac8847720180f857ff58846d181a4cb3cec Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ra=C3=BAl=20Gracia?= <raul.gracia@emc.com>
Date: Wed, 16 Sep 2020 10:41:41 +0200
Subject: [PATCH] Adapt Confluent Kafka driver to our benchmark methodology
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Signed-off-by: Raúl Gracia <raul.gracia@emc.com>
---
.../io/openmessaging/benchmark/Benchmark.java | 4 +-
.../benchmark/WorkloadGenerator.java | 21 ++++++++++
driver-kafka/deploy/deploy.yaml | 4 +-
driver-kafka/deploy/provision-kafka-aws.tf | 4 ++
.../deploy/templates/server.properties | 3 +-
driver-kafka/deploy/terraform.tfvars | 6 +--
driver-kafka/deploy/vars.yaml | 6 +++
driver-kafka/kafka.yaml | 40 +++++++++++++++++++
8 files changed, 80 insertions(+), 8 deletions(-)
create mode 100644 driver-kafka/deploy/vars.yaml
create mode 100644 driver-kafka/kafka.yaml
diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Benchmark.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Benchmark.java
index 9e6d64e..0f175cc 100644
--- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/Benchmark.java
+++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/Benchmark.java
@@ -151,8 +151,8 @@ public class Benchmark {
WorkloadGenerator generator = new WorkloadGenerator(driverConfiguration.name, workload, worker);
TestResult result = generator.run();
-
- String fileName = arguments.output.length() > 0 ? arguments.output
+ boolean useOutput = (arguments.output != null) && (arguments.output.length() > 0);
+ String fileName = useOutput? arguments.output
: String.format("%s-%s-%s.json", workloadName, driverConfiguration.name,
dateFormat.format(new Date()));
diff --git a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java
index 38abc92..64b6af7 100644
--- a/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java
+++ b/benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java
@@ -18,6 +18,7 @@
*/
package io.openmessaging.benchmark;
+import com.google.common.math.Stats;
import io.openmessaging.benchmark.utils.RandomGenerator;
import java.io.IOException;
import java.text.DecimalFormat;
@@ -463,6 +464,16 @@ public class WorkloadGenerator implements AutoCloseable {
throw new RuntimeException("Failed to collect aggregate latencies");
}
+ double aggPublishRate = Stats.meanOf(result.publishRate);
+ double aggConsumeRate = Stats.meanOf(result.consumeRate);
+
+ log.info(
+ "----- Aggregated Pub rate {} msg/s / {} MB/s | Cons rate {} msg/s / {} MB/s",
+ throughputFormat.format(aggPublishRate),
+ throughputFormat.format(aggPublishRate * workload.messageSize * 1e-6),
+ throughputFormat.format(aggConsumeRate),
+ throughputFormat.format(aggConsumeRate * workload.messageSize * 1e-6));
+
log.info(
"----- Aggregated Pub Latency (ms) avg: {} - 50%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}",
dec.format(microsToMillis(agg.publishLatency.getMean())),
@@ -473,6 +484,16 @@ public class WorkloadGenerator implements AutoCloseable {
dec.format(microsToMillis(agg.publishLatency.getValueAtPercentile(99.99))),
throughputFormat.format(microsToMillis(agg.publishLatency.getMaxValue())));
+ log.info(
+ "----- Aggregated E2E Latency (ms) avg: {} - 50%: {} - 95%: {} - 99%: {} - 99.9%: {} - 99.99%: {} - Max: {}",
+ dec.format(microsToMillis(agg.endToEndLatency.getMean())),
+ dec.format(microsToMillis(agg.endToEndLatency.getValueAtPercentile(50))),
+ dec.format(microsToMillis(agg.endToEndLatency.getValueAtPercentile(95))),
+ dec.format(microsToMillis(agg.endToEndLatency.getValueAtPercentile(99))),
+ dec.format(microsToMillis(agg.endToEndLatency.getValueAtPercentile(99.9))),
+ dec.format(microsToMillis(agg.endToEndLatency.getValueAtPercentile(99.99))),
+ throughputFormat.format(microsToMillis(agg.endToEndLatency.getMaxValue())));
+
result.aggregatedPublishLatencyAvg = microsToMillis(agg.publishLatency.getMean());
result.aggregatedPublishLatency50pct = microsToMillis(agg.publishLatency.getValueAtPercentile(50));
result.aggregatedPublishLatency75pct = microsToMillis(agg.publishLatency.getValueAtPercentile(75));
diff --git a/driver-kafka/deploy/deploy.yaml b/driver-kafka/deploy/deploy.yaml
index 7b2181f..71d0d86 100644
--- a/driver-kafka/deploy/deploy.yaml
+++ b/driver-kafka/deploy/deploy.yaml
@@ -80,7 +80,7 @@
dev: "{{ item }}"
with_items:
- "/dev/nvme1n1"
- - "/dev/nvme2n1"
+ - "/dev/nvme0n1"
- name: Mount disks
mount:
path: "{{ item.path }}"
@@ -90,7 +90,7 @@
state: mounted
with_items:
- { path: "/mnt/data-1", src: "/dev/nvme1n1" }
- - { path: "/mnt/data-2", src: "/dev/nvme2n1" }
+ - { path: "/mnt/data-2", src: "/dev/nvme0n1" }
- name: Store server lists
hosts: all
diff --git a/driver-kafka/deploy/provision-kafka-aws.tf b/driver-kafka/deploy/provision-kafka-aws.tf
index 630d72a..ac43f4a 100644
--- a/driver-kafka/deploy/provision-kafka-aws.tf
+++ b/driver-kafka/deploy/provision-kafka-aws.tf
@@ -214,3 +214,7 @@ output "zookeeper" {
output "prometheus_host" {
value = "${aws_instance.prometheus.0.public_ip}"
}
+
+output "client_ssh_host" {
+ value = "${aws_instance.client.0.public_ip}"
+}
diff --git a/driver-kafka/deploy/templates/server.properties b/driver-kafka/deploy/templates/server.properties
index 7df1891..6c8bfba 100644
--- a/driver-kafka/deploy/templates/server.properties
+++ b/driver-kafka/deploy/templates/server.properties
@@ -22,7 +22,8 @@ broker.id={{ brokerId }}
advertised.listeners=PLAINTEXT://{{ privateIp }}:9092
-log.dirs=/mnt/data-1,/mnt/data-2
+log.dirs=/mnt/data-1
+#,/mnt/data-2
zookeeper.connect={{ zookeeperServers }}
diff --git a/driver-kafka/deploy/terraform.tfvars b/driver-kafka/deploy/terraform.tfvars
index 855111a..60197a0 100644
--- a/driver-kafka/deploy/terraform.tfvars
+++ b/driver-kafka/deploy/terraform.tfvars
@@ -4,9 +4,9 @@ ami = "ami-9fa343e7" // RHEL-7.4
profile = "default"
instance_types = {
- "kafka" = "i3en.2xlarge"
- "zookeeper" = "t2.small"
- "client" = "c5n.2xlarge"
+ "kafka" = "i3.4xlarge"
+ "zookeeper" = "t3.small"
+ "client" = "c5.4xlarge"
"prometheus" = "c5.2xlarge"
}
diff --git a/driver-kafka/deploy/vars.yaml b/driver-kafka/deploy/vars.yaml
new file mode 100644
index 0000000..420d52a
--- /dev/null
+++ b/driver-kafka/deploy/vars.yaml
@@ -0,0 +1,6 @@
+---
+kafkaVersion: "2.6.0"
+
+zookeeperVersion: "3.5.5"
+prometheusVersion: "2.2.1"
+
diff --git a/driver-kafka/kafka.yaml b/driver-kafka/kafka.yaml
new file mode 100644
index 0000000..3ac2920
--- /dev/null
+++ b/driver-kafka/kafka.yaml
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+name: Kafka
+driverClass: io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver
+
+# Kafka client-specific configuration
+replicationFactor: 3
+
+topicConfig: |
+ min.insync.replicas=2
+
+commonConfig: |
+ bootstrap.servers=localhost:9092
+
+producerConfig: |
+ acks=all
+ linger.ms=1
+ batch.size=131072
+
+consumerConfig: |
+ auto.offset.reset=earliest
+ enable.auto.commit=false
--
2.17.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment