Skip to content

Instantly share code, notes, and snippets.

@jkeys089
Created May 23, 2014 03:39
Show Gist options
  • Save jkeys089/158ae8a14dfa6998680a to your computer and use it in GitHub Desktop.
Save jkeys089/158ae8a14dfa6998680a to your computer and use it in GitHub Desktop.
Demonstrates blocking behavior of MongoIterator when using MongoCollection.findAsync
import com.allanbank.mongodb.*;
import com.allanbank.mongodb.bson.Document;
import com.allanbank.mongodb.builder.*;
import com.allanbank.mongodb.bson.builder.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.List;
public class AsyncFindBlockingTest {
public static void main (String[] args) {
/***************************************/
/**************** Config ***************/
/***************************************/
String host = "10.0.1.98:27017";
String user = "admin";
String password = "";
String authDB = "admin";
String queryDB = "apigw";
String collection = "test_async";
int batchSize = 100;
int maxQueueSize = 100;
int maxPoolSize = 10;
int maxConnectionCount = 10;
MongoClientConfiguration clientConf = new MongoClientConfiguration();
clientConf.addServer(host);
clientConf.setMaxConnectionCount(maxConnectionCount);
ThreadPoolExecutor executor = null;
if (args.length > 0 && "pool".equalsIgnoreCase(args[0])) {
executor = new ThreadPoolExecutor (
1,
maxPoolSize,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(maxQueueSize, true),
new ThreadPoolExecutor.CallerRunsPolicy()
);
executor.prestartAllCoreThreads();
clientConf.setExecutor(executor);
} //if
Credential.Builder creds = new Credential.Builder();
creds.setUserName(user);
creds.setPassword(password.toCharArray());
creds.database(authDB);
clientConf.addCredential(creds);
MongoClient client = MongoFactory.createClient(clientConf);
MongoDatabase db = client.getDatabase(queryDB);
DocumentBuilder builder = BuilderFactory.start();
builder.add("_id", 1);
Document fields = builder.build();
Document query = BuilderFactory.start().build();
Find.Builder fbuilder = new Find.Builder(query);
fbuilder.batchSize(batchSize);
fbuilder.returnFields(fields);
MongoCollection collect = db.getCollection(collection);
final CountDownLatch latch = new CountDownLatch(1);
collect.findAsync(
new Callback<MongoIterator<Document>> () {
public void callback (MongoIterator<Document> v) {
try {
List data = new ArrayList();
if (v != null) {
while (v.hasNext()) {
data.add(v.next());
System.out.print("total: " + (data.size()) + " ==> more? ");
System.out.print("" + (v.hasNext()) + "\n");
}
System.out.println("===== done processing resultset =====");
}
} finally {
if (v != null) v.close();
latch.countDown();
}
}
public void exception (Throwable t) {
System.out.println("got error for find async " + t.getMessage());
}
},
fbuilder.build()
);
try {
latch.await();
if (executor != null) {
if (!executor.isTerminating()) executor.shutdownNow();
executor.awaitTermination(60, TimeUnit.SECONDS);
}
System.out.println("done.");
} catch (Exception ex) {
System.out.println("Error while waiting for execution to finish: " + ex.getMessage());
}
} //main
} //AsyncFindBlockingTest
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment