Skip to content

Instantly share code, notes, and snippets.

@JaiHirsch
Created May 1, 2019 16:23
Show Gist options
  • Save JaiHirsch/deeea82c73284d63e210508da66b85fa to your computer and use it in GitHub Desktop.
Save JaiHirsch/deeea82c73284d63e210508da66b85fa to your computer and use it in GitHub Desktop.
package scanner;
import com.mongodb.client.*;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import static com.mongodb.client.model.Filters.*;
public class SplitVectorScanThreaded {
public static AtomicInteger count = new AtomicInteger();
private static final Long PARTITION_DENOMINATOR = 5L;
public static void main(String[] args) throws Throwable {
try (MongoClient client = MongoClients.create()) {
MongoDatabase db = client.getDatabase("test");
List<FindIterable> splitVectorCursors = createSplitVectorCursors(db);
List<Future> futures = new ArrayList<>();
ExecutorService findIterableExecutorService = Executors.newFixedThreadPool(splitVectorCursors.size());
splitVectorCursors.forEach(findIterable -> futures.add(findIterableExecutorService.submit(simpleMongoIter(findIterable))));
futures.forEach(future -> {
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
});
findIterableExecutorService.shutdown();
}
System.out.println("Iterated through: "+count.get());
}
private static List<FindIterable> createSplitVectorCursors(MongoDatabase db) {
List<FindIterable> iterables = new ArrayList<>();
List<ObjectId> splitKeys = prepareListOfSplitKeys(db);
MongoCollection<Document> collection = db.getCollection("test");
for (int i = 0; i < splitKeys.size(); i++) {
Bson query;
ObjectId minKey = splitKeys.get(i);
if (i < splitKeys.size() - 1) {
ObjectId maxKey = splitKeys.get(i + 1);
query = and(Arrays.asList(gte("_id", minKey), lt("_id", maxKey)));
} else {
query = gte("_id", minKey);
}
iterables.add(collection.find(query));
}
return iterables;
}
private static List<ObjectId> prepareListOfSplitKeys(MongoDatabase db) {
List<ObjectId> splitKeys = new ArrayList<>();
getSplitVectorResults(db).forEach(doc -> splitKeys.add(doc.getObjectId("_id")));
Collections.sort(splitKeys);
splitKeys.add(0, new ObjectId("000000000000000000000000"));
return splitKeys;
}
private static List<Document> getSplitVectorResults(MongoDatabase db) {
Document collStats = db.runCommand(new Document("collStats", "test").append("scale", 1));
long partitionKeyCount = Long.valueOf(collStats.getInteger("count")) / PARTITION_DENOMINATOR;
long avgObjSize = Long.valueOf(collStats.getInteger("avgObjSize"));
return ((List<Document>) db.runCommand(new Document("splitVector", "test.test").append("keyPattern", new Document("_id", 1)).append("maxChunkSizeBytes", (partitionKeyCount * avgObjSize))).get("splitKeys"));
}
// This is where you act on the documents in the iterable
public static Runnable simpleMongoIter(FindIterable<Document> iterable) {
return () -> iterable.forEach((Consumer<Document>) document -> SplitVectorScanThreaded.count.incrementAndGet());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment