Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamkotwasinski/53bf420bb75bb8fe7d287fbdde3830b7 to your computer and use it in GitHub Desktop.
Save adamkotwasinski/53bf420bb75bb8fe7d287fbdde3830b7 to your computer and use it in GitHub Desktop.
Binary search with Kafka-backed list, using Collections.binarySearch
package fun.stuff;
import java.time.Duration;
import java.util.AbstractList;
import java.util.Collections;
import java.util.Properties;
import java.util.RandomAccess;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In this example we create a List implementation that actually uses a Kafka consumer underneath.
* This will allow us to do away with our own implementation of binary search and just use Collections.binarySearch
*/
public class BinarySearchWithKafkaBackedList {
private static final Logger LOG = LoggerFactory.getLogger(BinarySearchWithKafkaBackedList.class);
// The partition we are going to use.
private static TopicPartition PARTITION = new TopicPartition("binarysearch", 0);
private static final int MESSAGE_COUNT = 2013;
public static void main(final String[] args)
throws Exception {
// Put some sorted records into the partition.
// As we are running with an empty cluster, we assume that there is no other data.
final Properties producerProperties = new Properties();
producerProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (Producer<String, String> producer = new KafkaProducer<>(producerProperties)) {
int value = 0;
for (int i = 0; i < MESSAGE_COUNT; ++i) {
value += (1 + ThreadLocalRandom.current().nextInt(10));
final ProducerRecord<String, String> record = new ProducerRecord<>(
PARTITION.topic(), PARTITION.partition(), null, String.valueOf(value));
final RecordMetadata rm = producer.send(record).get();
LOG.info("array[{}] = {}", rm.offset(), value);
}
}
try (KafkaBackedArray array = new KafkaBackedArray(PARTITION, MESSAGE_COUNT)) {
// Let's pick an element to find.
final int needle = array.get(ThreadLocalRandom.current().nextInt(MESSAGE_COUNT));
LOG.info("looking for {}", needle);
// Typical binary search.
final int result = Collections.binarySearch(array, needle);
if (result >= 0) {
LOG.info("found: array[{}] = {}", result, needle);
return;
}
else {
throw new IllegalStateException("needle did not exist in array");
}
}
}
}
class KafkaBackedArray
extends AbstractList<Integer>
implements RandomAccess, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaBackedArray.class);
private final Consumer<String, String> consumer;
private final TopicPartition partition;
private final int messageCount;
public KafkaBackedArray(final TopicPartition partition, final int messageCount) {
// To access the "array" we need a consumer.
final Properties consumerProperties = new Properties();
consumerProperties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// This is a minor "tweak", we do not need more than one record.
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
this.consumer = new KafkaConsumer<>(consumerProperties);
this.consumer.assign(Collections.singleton(partition));
this.partition = partition;
this.messageCount = messageCount;
}
@Override
public Integer get(final int i) {
LOG.info("Accessing element {}", i);
this.consumer.seek(this.partition, i);
final ConsumerRecords<String, String> records = this.consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
final ConsumerRecord<String, String> record = records.iterator().next(); // This should not fail (let's discount things like broker erasing our data in the meantime).
return Integer.valueOf(record.value());
}
@Override
public int size() {
return this.messageCount;
}
@Override
public void close() {
this.consumer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment