Last active
November 29, 2023 17:31
-
-
Save mmacphail/777dcf45045bacd7804ba13f88d9d4da to your computer and use it in GitHub Desktop.
Ideal java producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package clients; | |
import io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor; | |
import java.io.IOException; | |
import java.nio.charset.Charset; | |
import java.nio.file.Files; | |
import java.nio.file.Paths; | |
import java.util.List; | |
import java.util.Properties; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
public class Producer { | |
static final String DRIVER_FILE_PREFIX = "./drivers/"; | |
static final String KAFKA_TOPIC = "driver-positions"; | |
/** | |
* Java producer. | |
*/ | |
public static void main(String[] args) throws IOException, InterruptedException { | |
System.out.println("Starting Java producer."); | |
// Load a driver id from an environment variable | |
// if it isn't present use "driver-1" | |
String driverId = System.getenv("DRIVER_ID"); | |
driverId = (driverId != null) ? driverId : "driver-1"; | |
// Configure the location of the bootstrap server, default serializers, | |
// Confluent interceptors | |
final Properties settings = new Properties(); | |
settings.put(ProducerConfig.CLIENT_ID_CONFIG, driverId); | |
// TODO: configure the location of the bootstrap server | |
settings.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); | |
settings.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
settings.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
settings.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, | |
List.of(MonitoringProducerInterceptor.class)); | |
// Data quality | |
settings.put(ProducerConfig.ACKS_CONFIG, "all"); | |
settings.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // preserve ordering and avoid duplicates | |
settings.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 300000); | |
// Performance | |
settings.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); | |
settings.put(ProducerConfig.LINGER_MS_CONFIG, 300); // test the appropriate config using kafka-producer-perf-test | |
settings.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip"); // test the appropriate compression configuration | |
final KafkaProducer<String, String> producer = new KafkaProducer<>(settings); | |
// Adding a shutdown hook to clean up when the application exits | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
System.out.println("Closing producer."); | |
producer.close(); | |
})); | |
int pos = 0; | |
final String[] rows = Files.readAllLines(Paths.get(DRIVER_FILE_PREFIX + driverId + ".csv"), | |
Charset.forName("UTF-8")).toArray(new String[0]); | |
// Loop forever over the driver CSV file.. | |
while (true) { | |
final String key = driverId; | |
final String value = rows[pos]; | |
// TODO: populate the message object | |
final ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TOPIC, key, value); | |
// TODO: write the lat/long position to a Kafka topic | |
// TODO: print the key and value in the callback lambda | |
producer.send(record, (md, e) -> { | |
System.out.printf("message sent for driver %s, value: %s, offset: %s\n", key, value, md.offset()); | |
}); | |
Thread.sleep(1000); | |
pos = (pos + 1) % rows.length; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment