Skip to content

Instantly share code, notes, and snippets.

@jyemin
Last active May 6, 2024 13:57
Show Gist options
  • Save jyemin/385f03966bde6d89aef7e7b82622b74b to your computer and use it in GitHub Desktop.
Save jyemin/385f03966bde6d89aef7e7b82622b74b to your computer and use it in GitHub Desktop.
Sketch of KAFKA-374 algorithm based on BaseCluster#selectServer
package org.mongodb;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.ClusterListener;
import org.bson.Document;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
public class KAFKA374 {
private final AtomicReference<CountDownLatch> phase = new AtomicReference<>(new CountDownLatch(1));
private final MongoClient client = MongoClients.create(MongoClientSettings.builder()
.applyToClusterSettings(builder ->
builder.addClusterListener(new SynchronizingClusterListener()))
.build());
private volatile ClusterDescription clusterDescription = client.getClusterDescription();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) throws InterruptedException {
var test = new KAFKA374();
//noinspection InfiniteLoopStatement
while (true) {
test.write(List.of(new Document()));
//noinspection BusyWait
Thread.sleep(1000);
}
}
private void write(List<Document> documentList) throws InterruptedException {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = clusterDescription;
boolean hasKickedMonitors = false;
while (!curDescription.hasWritableServer()) {
// HERE: throw RetryingException if server selection timeout exceeded
// encourage the server monitors to connect to a writeable server so that this loop doesn't block for
// longer than necessary
if (!hasKickedMonitors) {
executorService.submit(() -> {
client.getDatabase("admin").runCommand(new Document("ping", 1));
});
hasKickedMonitors = true;
}
currentPhase.await(/* remaining timeout */);
currentPhase = phase.get();
curDescription = clusterDescription;
}
try {
// Simulate what the Sink Connector does
client.getDatabase("test").getCollection("test").insertMany(documentList);
} catch (Exception e) {
// write all documents to DLQ
}
}
private class SynchronizingClusterListener implements ClusterListener {
@Override
public void clusterDescriptionChanged(ClusterDescriptionChangedEvent event) {
clusterDescription = event.getNewDescription();
phase.getAndSet(new CountDownLatch(1)).countDown();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment