Welcome to the session 6 lab 3. The work for this lab is done in ~/kafka-training/lab6.3
.
In this lab, you are going to implement At-Most-Once and At-Least-Once message semantics from the
consumer perspective.
Please refer to the Kafka course notes for any updates or changes to this lab.
Find the latest version of this lab here.
Consumers join consumer group after subscribe
and then poll()
is called.
Automatically, a consumer sends periodic heartbeats to Kafka brokers server.
If consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer is deemed dead, and its partitions reassigned.
Instead of subscribing to the topic using subscribe, you can call assign(Collection) with the full topic partition list
String topic = "log-replication";
TopicPartition part0 = new TopicPartition(topic, 0);
TopicPartition part1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(part0, part1);
Using consumer as before with poll(). Manual Partition assignment negates the use of group coordination and auto consumer failover. Each consumer acts independently even if in a consumer group (use unique group id to avoid confusion). You have to use assign() or subscribe(), but not both. Use subscribe() to allow Kafka to manage failover and load balancing with consumer groups. Use assign() if you want to work with partitions exact.
Calling poll() marks consumer as alive. If consumer continues to call poll(), then consumer is alive and in consumer group and gets messages for partitions assigned (has to call before every max.poll.interval.ms interval), If not calling poll(), even if consumer is sending heartbeats, consumer is still considered dead.
Processing of records from poll has to be faster than max.poll.interval.ms interval or your consumer could be marked dead!
The max.poll.records is used to limit total records returned from a poll
method call and makes it easier to predict max time to process the records on each poll interval.
There are three message delivery semantics: at most once, at least once and exactly once.
At most once is messages may be lost but never redelivered. At least once is messages are never lost but may be redelivered. Exactly once is each message is delivered once and only once. Exactly once is preferred but more expensive, and requires more bookkeeping for the producer and consumer.
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class SimpleStockPriceConsumer {
...
private static void pollRecordsAndProcess(
int readCountStatusUpdate,
Consumer<String, StockPrice> consumer,
Map<String, StockPrice> map, int readCount) {
final ConsumerRecords<String, StockPrice> consumerRecords =
consumer.poll(1000);
try {
startTransaction(); //Start DB Transaction
processRecords(map, consumerRecords); //Process the records
consumer.commitSync();
commitTransaction(); //Commit DB Transaction
} catch (CommitFailedException ex) {
logger.error("Failed to commit sync to log", ex);
rollbackTransaction(); //Rollback Transaction
} catch (DatabaseException dte) {
logger.error("Failed to write to DB", dte);
rollbackTransaction(); //Rollback Transaction
}
if (readCount % readCountStatusUpdate == 0) {
displayRecordsStatsAndStocks(map, consumerRecords);
}
}
...
}
ACTION - EDIT src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java
and implement At-Least-Once Semantics
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class SimpleStockPriceConsumer {
...
private static void pollRecordsAndProcess(
int readCountStatusUpdate,
Consumer<String, StockPrice> consumer,
Map<String, StockPrice> map, int readCount) {
final ConsumerRecords<String, StockPrice> consumerRecords =
consumer.poll(1000);
try {
startTransaction(); //Start DB Transaction
consumer.commitSync(); //Commit the Kafka offset
processRecords(map, consumerRecords); //Process the records
commitTransaction(); //Commit DB Transaction
} catch (CommitFailedException ex) {
logger.error("Failed to commit sync to log", ex);
rollbackTransaction(); //Rollback Transaction
} catch (DatabaseException dte) {
logger.error("Failed to write to DB", dte);
rollbackTransaction(); //Rollback Transaction
}
if (readCount % readCountStatusUpdate == 0) {
displayRecordsStatsAndStocks(map, consumerRecords);
}
}
...
}
ACTION - EDIT src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java
and implement At-Most-Once Semantics
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class SimpleStockPriceConsumer {
...
private static void pollRecordsAndProcess(
int readCountStatusUpdate,
Consumer<String, StockPrice> consumer,
Map<String, StockPrice> map, int readCount)
consumerRecords.forEach(record -> {
try {
startTransaction(); //Start DB Transaction
//Commit Kafka at exact location for record, and only this record.
final TopicPartition recordTopicPartition =
new TopicPartition(record.topic(), record.partition());
final Map<TopicPartition, OffsetAndMetadata> commitMap =
Collections.singletonMap(recordTopicPartition,
new OffsetAndMetadata( offset: record.offset() + 1));
consumer.commitSync(commitMap); //Kafka Commit
processRecords(record); //Process the record
commitTransaction(); //Commit DB Transaction
} catch (CommitFailedException ex) {
logger.error("Failed to commit sync to log", ex);
rollbackTransaction(); //Rollback Transaction
} catch (DatabaseException dte) {
logger.error("Failed to write to DB", dte);
rollbackTransaction(); //Rollback Transaction
}
});
if (readCount % readCountStatusUpdate == 0) {
displayRecordsStatsAndStocks(map, consumerRecords);
}
}
...
}
ACTION - EDIT src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java
and implement fine-grained At-Most-Once Semantics
package com.cloudurable.kafka.consumer;
import com.cloudurable.kafka.StockAppConstants;
import com.cloudurable.kafka.model.StockPrice;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class SimpleStockPriceConsumer {
...
private static void pollRecordsAndProcess(
int readCountStatusUpdate,
Consumer<String, StockPrice> consumer,
Map<String, StockPrice> map, int readCount)
consumerRecords.forEach(record -> {
try {
startTransaction(); //Start DB Transaction
processRecords(record); //Process the record
//Commit Kafka at exact location for the record, and only this record.
final TopicPartition recordTopicPartition =
new TopicPartition(record.topic(), record.partition());
final Map<TopicPartition, OffsetAndMetadata> commitMap =
Collections.singletonMap(recordTopicPartition,
new OffsetAndMetadata( offset: record.offset() + 1));
consumer.commitSync(commitMap); //Kafka Commit
commitTransaction(); //Commit DB Transaction
} catch (CommitFailedException ex) {
logger.error("Failed to commit sync to log", ex);
rollbackTransaction(); //Rollback Transaction
} catch (DatabaseException dte) {
logger.error("Failed to write to DB", dte);
rollbackTransaction(); //Rollback Transaction
}
});
if (readCount % readCountStatusUpdate == 0) {
displayRecordsStatsAndStocks(map, consumerRecords);
}
}
...
}
ACTION - EDIT src/main/java/com/cloudurable/kafka/consumer/SimpleStockPriceConsumer.java
and implement fine-grained At-Least-Once Semantics
It should all run. Stop consumer and producer when finished.
This comprehensive Kafka tutorial covers Kafka architecture and design. The Kafka tutorial has example Java Kafka producers and Kafka consumers. The Kafka tutorial also covers Avro and Schema Registry.
Complete Kafka Tutorial: Architecture, Design, DevOps and Java Examples.
- Kafka Tutorial Part 1: What is Kafka?
- Kafka Tutorial Part 2: Kafka Architecture
- Kafka Tutorial Part 3: Kafka Topic Architecture
- Kafka Tutorial Part 4: Kafka Consumer Architecture
- Kafka Tutorial Part 5: Kafka Producer Architecture
- Kafka Tutorial Part 6: Using Kafka from the command line
- Kafka Tutorial Part 7: Kafka Broker Failover and Consumer Failover
- Kafka Tutorial Part 8: Kafka Ecosystem
- Kafka Tutorial Part 9: Kafka Low-Level Design
- Kafka Tutorial Part 10: Kafka Log Compaction Architecture
- Kafka Tutorial Part 11: Writing a Kafka Producer example in Java
- Kafka Tutorial Part 12: Writing a Kafka Consumer example in Java
- Kafka Tutorial Part 13: Writing Advanced Kafka Producer Java examples
- Kafka Tutorial 14: Writing Advanced Kafka Consumer Java examples
- Kafka Tutorial Part 15: Kafka and Avro
- Kafka Tutorial Part 16: Kafka and Schema Registry
- Kafka Tutorial
We hope you enjoyed this article. Please provide feedback. Cloudurable provides Kafka training, Kafka consulting, Kafka support and helps setting up Kafka clusters in AWS.