Due to environment limitation, the test environment are two Linux servers creating an Ambari cluster with,
- A zookeeper cluster shared by nifi and Kafka cluster,
- A nifi cluster with two nodes,
- A Kafka cluster
- Pre-Step, put sequentially 10000 messages in TopicA and 10000 messages in TopicB
- both topic with 2 replicas, no partition
- messages are putting into the queue with sequence, (like Msg1, Msg2...)
- Test Step, Start Nifi Flows
- one flow subscribe TopicA and put subscribed message into TopicA.D
- one flow subscribe TopicB and put subscribed message into TopicB.D
- all nifi processors running on Nifi cluster and configured as "Concurrent Tasks =1" and Execution = "Primary node"
- subscribe processor is set to consume from "earliest" and with unique client Id.
- After the step, read messages from TopicA.D and TopicB.D, verify that messages are in sequence.
Conclusion: messages still remained the sequence, although the Nifi have 2 instances.
Similar as above, but change the volume of the message to 50k, passed.
- Pre-Step, put sequentially 20000 messages in TopicA
- topic with 2 replicas, no partition
- messages are putting into the queue with sequence, (like Msg1, Msg2...)
- Test Step, Start Nifi Flows
- the flow subscribe TopicA and put subscribed message into TopicA.D
- all nifi processors running on Nifi cluster and configured as "Concurrent Tasks =1" and Execution = "All nodes"
- subscribe processor is set to consume from "earliest" and with unique client Id.
- After the step, read messages from TopicA.D, verify that messages are in sequence.
Conclusion: messages still remained the sequence, although the Nifi have 2 instances and processors are allowed to run on both nodes.
- Pre-Step, put sequentially 40000 messages in TopicA
- topic with 2 replicas, no partition
- messages are putting into the queue with sequence, (like Msg1, Msg2...)
- check Nifi Primary Node host name and instance process id
- Test Step, Start Nifi Flows, in middle of the processing kill the primary nifi instance
- the flow subscribe TopicA and put subscribed message into TopicA.D
- all nifi processors running on Nifi cluster and configured as "Concurrent Tasks =1" and Execution = "Primary nodes"
- subscribe processor is set to consume from "earliest" and with unique client Id.
- After the step, read messages from TopicA.D, although the nifi instance is killed in the middle of message processing, the message the message sequence.
have an issue with message processing, but after changing the message publisher to correct version (Kafka 0.10), problem solved.
Conclusion: messages still remained the sequence, batching & transaction works to maintain message sequence.
$ kafka-topics.sh --create --zookeeper {zServer1:2181,zServer2:2181} --replication-factor 2 --partitions 2 --topic TopicA
Assign different keys while sending out the message, and check the behaviour of partition assignment.
producer.send(new ProducerRecord<String, String>(topic, "Avasdr", "Avasdr.Msg" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
System.out.println("key: " + "Avasdr" + ", Partition: " + metadata.partition());
}
});
Conclusion the message is sending out to different partitions using round-robin (with a group of 3) strategy.
- For example, messages with 3 different keys will be sent all to partition 0 ; messages with 5 different keys, the first 3 keys will be assigned to partition 0, and rest 2 keys to partition 1.
Here's an example of consuming message from specified partition from Java client,
TopicPartition partition = new TopicPartition(topicName, PartionNum);
consumer.assign(Arrays.asList(partition));
boolean inSequence=true;
for(int i=0;i< numOfMsg;i++) {
ConsumerRecords<String, String> records = consumer.poll(batch);
for (ConsumerRecord<String, String> record : records) {
System.out.println("message value," + record.value());
}
The easiest way of maintaining message sequence as well as have the capability to parallel processing is to publish messages to Kafka Topic with the key. Messages with the same key will always be sent to the same partition and one partition is guaranteed to be consumed by only one consumer from Nifi-Kafka Client (even if we allow muti-threading).
- https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
- https://howtoprogram.xyz/2016/06/04/write-apache-kafka-custom-partitioner/
Although it’s possible to increase the number of partitions over time, one has to be careful if messages are produced with keys. When publishing a keyed message, Kafka deterministically maps the message to a partition based on the hash of the key. This provides a guarantee that messages with the same key are always routed to the same partition. This guarantee can be important for certain applications since messages within a partition are always delivered in order to the consumer. If the number of partitions changes, such a guarantee may no longer hold. To avoid this situation, a common practice is to over-partition a bit.
Test Steps
- prepare 50k messages in TopicA
- subscribe messages from Nifi Cluster (concurrency=1, run on primary node), and then send to remote nifi via site-to-site protocol
- remote nifi read the message and publish to TopicA.D
- verify message sequence
Configuration
- All kafka processors are using version 0.10
- All publisher processors are setting to "Guarantee Replicated Delivery"
- Set single threading and "FIFO"
50k message
kill the primary nifi in middle of processing. Failed.
Result: target topic get 50k messages. But sequence is wrong. Around 8k messages are received after Nifi instance started again.