Skip to content

Instantly share code, notes, and snippets.

@mtranter
Last active October 10, 2022 03:02
Show Gist options
  • Save mtranter/3c84e65259c2a0fcc969d5b0f905378b to your computer and use it in GitHub Desktop.
Save mtranter/3c84e65259c2a0fcc969d5b0f905378b to your computer and use it in GitHub Desktop.
Producer per message tests
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.2.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
control-center:
image: confluentinc/cp-enterprise-control-center:7.2.1
hostname: control-center
container_name: control-center
depends_on:
- broker
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
# CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
# CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
# CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
# CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
// See https://aka.ms/new-console-template for more information
using System.Text;
using Confluent.Kafka;
class Program
{
class ByteArraySerializer : ISerializer<byte[]>
{
public byte[] Serialize(byte[] data, SerializationContext context)
=> data;
}
public static void Main(string[] args)
{
var producerConfig = new ProducerConfig()
{
Acks = Acks.All,
ClientId = "PublicMemberApi",
BootstrapServers = "localhost:9092"
};
var producerBuilder =
new ProducerBuilder<byte[], byte[]>(producerConfig)
.SetValueSerializer(new ByteArraySerializer())
.SetKeySerializer(new ByteArraySerializer());
// var startOne = DateTime.Now;
// for (var i = 0; i < 1000; i++)
// {
// var msg = Encoding.UTF8.GetBytes(i.ToString());
// using (var p = producerBuilder.Build())
// {
// p.Produce("test-topic", new Message<byte[], byte[]>() { Key = msg, Value = msg });
// p.Flush();
// }
// }
// var endOne = DateTime.Now;
// var totalTime = endOne - startOne;
// Console.WriteLine($"New producer each time took {totalTime.TotalMilliseconds}ms");
using (var p = producerBuilder.Build())
{
var startTwo = DateTime.Now;
for (var i = 0; i < 1000; i++)
{
var msg = Encoding.UTF8.GetBytes(i.ToString());
{
p.Produce("test-topic", new Message<byte[], byte[]>() { Key = msg, Value = msg });
}
}
p.Flush();
var endTwo = DateTime.Now;
var totalTimeTwo = endTwo - startTwo;
Console.WriteLine($"Same producer each time took {totalTimeTwo.TotalMilliseconds}ms");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment