Skip to content

Instantly share code, notes, and snippets.

@jyemin
Created March 27, 2012 02:52
Show Gist options
  • Save jyemin/2212066 to your computer and use it in GitHub Desktop.
Save jyemin/2212066 to your computer and use it in GitHub Desktop.
Program for benchmarking upserts using the Java driver
import com.mongodb.*;
import org.bson.types.ObjectId;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Test for reading from a large connection pool
*/
public class MongoUpsertContentionTest {
public static void testPoolsOfDifferentSizeWithDifferentThreadCounts() throws UnknownHostException, InterruptedException {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
threadMXBean.setThreadContentionMonitoringEnabled(true);
threadMXBean.setThreadCpuTimeEnabled(true);
int numThreads = 1;
int poolSize = 20;
int numJobs = numThreads;
int numUpsertsPerJob = 200000;
MongoOptions options = new MongoOptions();
options.connectionsPerHost = poolSize;
Mongo mongo = new Mongo(Arrays.asList(new ServerAddress("127.0.0.1", 27017)), options);
mongo.getDB("largeConnectionPoolDB").dropDatabase();
System.out.println("Warming up...");
executeParallelOperation(mongo, poolSize, poolSize, 10, true, WriteConcern.SAFE, false);
System.out.println();
System.out.println("num threads: " + numThreads);
System.out.println("pool size: " + poolSize);
System.out.println("jobs: " + numJobs);
System.out.println("num upserts per job: " + numUpsertsPerJob);
System.out.println();
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true);
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true);
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true);
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true);
executeParallelOperation(mongo, numThreads, numJobs, numUpsertsPerJob, true, WriteConcern.SAFE, true);
mongo.getDB("largeConnectionPoolDB").dropDatabase();
mongo.close();
}
private static void executeParallelOperation(Mongo mongo, int numThreads, int numJobs, int numUpserts,
boolean reserveConnection, WriteConcern writeConcern, boolean printResults) throws UnknownHostException, InterruptedException {
DBCollection coll = mongo.getDB("largeConnectionPoolDB").getCollection("largeConnectionPoolDB");
coll.drop();
doOne(numThreads, numJobs, numUpserts, coll, reserveConnection, writeConcern, printResults);
}
private static void doOne(int numThreads, int numJobs, final int numUpserts, final DBCollection coll, final boolean reserveConnection, WriteConcern writeConcern, boolean printResults) throws InterruptedException {
long start = System.currentTimeMillis();
ExecutorService es = Executors.newFixedThreadPool(numThreads);
List<ContentionJob> jobs = new ArrayList<ContentionJob>();
for (int x = 0; x < numJobs; x++) {
ContentionJob job = new ContentionJob(coll, reserveConnection, numUpserts, writeConcern);
jobs.add(job);
es.submit(job);
}
for (ContentionJob job : jobs) {
job.awaitCompletion();
}
if (printResults) {
long end = System.currentTimeMillis();
System.out.println("reserve: " + reserveConnection + ", " + (reserveConnection ? " " : "") + "time elapsed: " + (end - start) + " ms");
printPoolThreadInfo();
System.out.println();
}
es.shutdown();
es.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
public static void printPoolThreadInfo() throws InterruptedException {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
for (ThreadInfo cur : threadInfos) {
if (!cur.getThreadName().startsWith("pool")) {
continue;
}
// System.out.print(cur.getThreadName() + ": ");
System.out.print("blocked count: " + cur.getBlockedCount());
System.out.print(", blocked time: " + cur.getBlockedTime() + " ms");
System.out.print(", cpu time: " + (threadMXBean.getThreadCpuTime(cur.getThreadId()) / 1000000) + " ms");
System.out.print(", user time: " + (threadMXBean.getThreadUserTime(cur.getThreadId()) / 1000000) + " ms");
System.out.println();
}
}
public static void main(String args[]) throws UnknownHostException, InterruptedException {
testPoolsOfDifferentSizeWithDifferentThreadCounts();
}
static class ContentionJob implements Runnable {
protected final DBCollection coll;
private final boolean reserveConnection;
private final int num;
private final WriteConcern writeConcern;
private volatile boolean done;
ContentionJob(DBCollection coll, boolean reserveConnection, int num, WriteConcern writeConcern) {
this.coll = coll;
this.reserveConnection = reserveConnection;
this.num = num;
this.writeConcern = writeConcern;
}
public void run() {
if (reserveConnection)
coll.getDB().requestStart();
try {
DBObject obj = new BasicDBObject();
obj.put("_id", new ObjectId());
DBObject upsert = new BasicDBObject("$inc", new BasicDBObject("x", 1));
for (int i = 0; i < num; i++) {
coll.update(obj, upsert, true, false, writeConcern);
}
} catch (RuntimeException e) {
Logger.getAnonymousLogger().log(Level.INFO, "Exception", e);
} finally {
if (reserveConnection)
coll.getDB().requestDone();
}
synchronized (this) {
done = true;
notify();
}
}
public synchronized void awaitCompletion() throws InterruptedException {
while (!done) {
wait();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment