Created
July 26, 2022 11:34
-
-
Save adamkotwasinski/53bf420bb75bb8fe7d287fbdde3830b7 to your computer and use it in GitHub Desktop.
Binary search with Kafka-backed list, using Collections.binarySearch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fun.stuff; | |
import java.time.Duration; | |
import java.util.AbstractList; | |
import java.util.Collections; | |
import java.util.Properties; | |
import java.util.RandomAccess; | |
import java.util.concurrent.ThreadLocalRandom; | |
import org.apache.kafka.clients.CommonClientConfigs; | |
import org.apache.kafka.clients.consumer.Consumer; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.clients.consumer.ConsumerRecords; | |
import org.apache.kafka.clients.consumer.KafkaConsumer; | |
import org.apache.kafka.clients.producer.KafkaProducer; | |
import org.apache.kafka.clients.producer.Producer; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.clients.producer.ProducerRecord; | |
import org.apache.kafka.clients.producer.RecordMetadata; | |
import org.apache.kafka.common.TopicPartition; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* In this example we create a List implementation that actually uses a Kafka consumer underneath. | |
* This will allow us to do away with our own implementation of binary search and just use Collections.binarySearch | |
*/ | |
public class BinarySearchWithKafkaBackedList { | |
private static final Logger LOG = LoggerFactory.getLogger(BinarySearchWithKafkaBackedList.class); | |
// The partition we are going to use. | |
private static TopicPartition PARTITION = new TopicPartition("binarysearch", 0); | |
private static final int MESSAGE_COUNT = 2013; | |
public static void main(final String[] args) | |
throws Exception { | |
// Put some sorted records into the partition. | |
// As we are running with an empty cluster, we assume that there is no other data. | |
final Properties producerProperties = new Properties(); | |
producerProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); | |
try (Producer<String, String> producer = new KafkaProducer<>(producerProperties)) { | |
int value = 0; | |
for (int i = 0; i < MESSAGE_COUNT; ++i) { | |
value += (1 + ThreadLocalRandom.current().nextInt(10)); | |
final ProducerRecord<String, String> record = new ProducerRecord<>( | |
PARTITION.topic(), PARTITION.partition(), null, String.valueOf(value)); | |
final RecordMetadata rm = producer.send(record).get(); | |
LOG.info("array[{}] = {}", rm.offset(), value); | |
} | |
} | |
try (KafkaBackedArray array = new KafkaBackedArray(PARTITION, MESSAGE_COUNT)) { | |
// Let's pick an element to find. | |
final int needle = array.get(ThreadLocalRandom.current().nextInt(MESSAGE_COUNT)); | |
LOG.info("looking for {}", needle); | |
// Typical binary search. | |
final int result = Collections.binarySearch(array, needle); | |
if (result >= 0) { | |
LOG.info("found: array[{}] = {}", result, needle); | |
return; | |
} | |
else { | |
throw new IllegalStateException("needle did not exist in array"); | |
} | |
} | |
} | |
} | |
class KafkaBackedArray | |
extends AbstractList<Integer> | |
implements RandomAccess, AutoCloseable { | |
private static final Logger LOG = LoggerFactory.getLogger(KafkaBackedArray.class); | |
private final Consumer<String, String> consumer; | |
private final TopicPartition partition; | |
private final int messageCount; | |
public KafkaBackedArray(final TopicPartition partition, final int messageCount) { | |
// To access the "array" we need a consumer. | |
final Properties consumerProperties = new Properties(); | |
consumerProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | |
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); | |
// This is a minor "tweak", we do not need more than one record. | |
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1); | |
this.consumer = new KafkaConsumer<>(consumerProperties); | |
this.consumer.assign(Collections.singleton(partition)); | |
this.partition = partition; | |
this.messageCount = messageCount; | |
} | |
@Override | |
public Integer get(final int i) { | |
LOG.info("Accessing element {}", i); | |
this.consumer.seek(this.partition, i); | |
final ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); | |
final ConsumerRecord<String, String> record = records.iterator().next(); // This should not fail (let's discount things like broker erasing our data in the meantime). | |
return Integer.valueOf(record.value()); | |
} | |
@Override | |
public int size() { | |
return this.messageCount; | |
} | |
@Override | |
public void close() { | |
this.consumer.close(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment