Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamkotwasinski/f7fd21fd5de9d6b8ce6d3d0a97011aa4 to your computer and use it in GitHub Desktop.
Save adamkotwasinski/f7fd21fd5de9d6b8ce6d3d0a97011aa4 to your computer and use it in GitHub Desktop.
Binary search with a Kafka partition (using Consumer explicitly)
package fun.stuff;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In this example we are going to use Kafka as a remote array.
* As it as an array with sorted values, we can then implement a binary search, with the array access implemented with Kafka consumer.
*/
public class BinarySearchWithKafkaExplicitly {
private static final Logger LOG = LoggerFactory.getLogger(BinarySearchWithKafkaExplicitly.class);
// The partition we are going to use.
private static TopicPartition PARTITION = new TopicPartition("binarysearch", 0);
private static final int MESSAGE_COUNT = 2013;
public static void main(final String[] args)
throws Exception {
// Put some sorted records into the partition.
// As we are running with an empty cluster, we assume that there is no other data.
final Properties producerProperties = new Properties();
producerProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
int value = 0;
for (int i = 0; i < MESSAGE_COUNT; ++i) {
value += (1 + ThreadLocalRandom.current().nextInt(10));
final ProducerRecord<String, String> record = new ProducerRecord<>(
PARTITION.topic(), PARTITION.partition(), null, String.valueOf(value));
final RecordMetadata rm = producer.send(record).get();
LOG.info("array[{}] = {}", rm.offset(), value);
}
}
// To access the "array" we need a consumer.
final Properties consumerProperties = new Properties();
consumerProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// This is a minor "tweak", we do not need more than one record.
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
consumer.assign(Collections.singleton(PARTITION));
// Let's pick an element to find.
final int needle = arrayAccess(ThreadLocalRandom.current().nextInt(MESSAGE_COUNT), consumer);
LOG.info("looking for {}", needle);
// Typical binary search.
long l = 0;
long r = MESSAGE_COUNT - 1;
while (l != r) {
final long m = (l + r + 1) / 2; // ceil((l + r) / 2)
final int array_m = arrayAccess(m, consumer);
if (array_m > needle) {
r = m - 1;
}
else {
l = m;
}
}
if (needle == arrayAccess(l, consumer)) {
LOG.info("found: array[{}] = {}", l, needle);
return;
}
else {
throw new IllegalStateException("value-looked-for does not exist in array");
}
}
}
// Our analogy for accessing array[i]
private static int arrayAccess(final long i, final Consumer<String, String> consumer) {
LOG.info("Accessing element {}", i);
consumer.seek(PARTITION, i);
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
final ConsumerRecord<String, String> record = records.iterator().next(); // This should not fail (let's discount things like broker erasing our data in the meantime).
return Integer.valueOf(record.value());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment