Skip to content

Instantly share code, notes, and snippets.

@fbricon
Created March 15, 2021 11:16
Show Gist options
  • Save fbricon/d54f22f59c01497c3ee6d4a52b064ac8 to your computer and use it in GitHub Desktop.
Save fbricon/d54f22f59c01497c3ee6d4a52b064ac8 to your computer and use it in GitHub Desktop.
Produce Long messages to local Kafka cluster
///usr/bin/env jbang "$0" "$@" ; exit $?
//DEPS info.picocli:picocli:4.5.0
//DEPS org.apache.kafka:kafka-clients:LATEST
import picocli.CommandLine;
import picocli.CommandLine.Command;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import java.util.concurrent.Callable;
@Command(name = "kafkaproducer", mixinStandardHelpOptions = true, version = "kafkaproducer 0.1",
description = "kafkaproducer made with jbang")
class kafkaproducer implements Callable<Integer> {
public static void main(String... args) {
int exitCode = new CommandLine(new kafkaproducer()).execute(args);
System.exit(exitCode);
}
@Override
public Integer call() throws Exception { // your business logic goes here...
runProducer();
return 0;
}
void runProducer() throws InterruptedException, ExecutionException {
String topicName = "jbang-kafkaproducer";
int count = 20;
Producer<Long, Long> producer = createProducer();
for (int index = 0; index < count; index++) {
Long value = new Random().nextLong();
ProducerRecord<Long, Long> record = new ProducerRecord<Long, Long>(topicName, value);
RecordMetadata metadata = producer.send(record).get();
System.out.println("Record sent with key=" + index + " / value="+ value+" to partition " + metadata.partition()
+ " with offset " + metadata.offset());
}
}
public static Producer<Long, Long> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "eclipse-kafka");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
// props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
// CustomPartitioner.class.getName());
return new KafkaProducer<>(props);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment