Last active
August 29, 2015 14:26
-
-
Save magro/42c32863178bc9cbc126 to your computer and use it in GitHub Desktop.
A benchmark for solrs, allows to compare it with solrj. Can be copied in the solrs project to `src/test/scala/io/ino/solrs/Benchmark.scala`, build.sbt additionally needs `connectInput in (Test,run) := true`. Can be run from within sbt via `test:runMain io.ino.solrs.Benchmark 4 solrj` (or alternatively "solrs").
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.ino.solrs | |
import java.util.Arrays._ | |
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} | |
import java.util.concurrent.{Executors, TimeUnit} | |
import io.ino.solrs.SolrUtils._ | |
import org.apache.curator.test.TestingServer | |
import org.apache.solr.client.solrj.SolrQuery | |
import org.apache.solr.client.solrj.impl.CloudSolrClient | |
import org.apache.solr.client.solrj.response.QueryResponse | |
import org.slf4j.LoggerFactory | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.language.{postfixOps, reflectiveCalls} | |
import scala.util.{Failure, Success, Try} | |
object Benchmark extends App { | |
val logger = LoggerFactory.getLogger(getClass) | |
val defaultNumUsers = 1 | |
val numWarmupQueries = 2000 | |
def numUsers: Int = if(args.nonEmpty) Try(args(0).toInt).getOrElse(defaultNumUsers) else defaultNumUsers | |
def useSolrJ: Boolean = args.length == 1 && args(0) == "solrj" || args.length > 1 && args(1) == "solrj" | |
val numSuccess = new AtomicInteger(0) | |
val numFailure = new AtomicInteger(0) | |
val running = new AtomicBoolean(true) | |
val zk = new TestingServer() | |
zk.start() | |
val solrRunners = List( | |
SolrRunner.start(18888, Some(ZooKeeperOptions(zk.getConnectString, bootstrapConfig = Some("collection1")))), | |
SolrRunner.start(18889, Some(ZooKeeperOptions(zk.getConnectString))) | |
) | |
lazy val solrClient: { | |
def shutdown(): Unit | |
def query(q: SolrQuery): Future[QueryResponse] | |
} = { | |
if(useSolrJ) { | |
logger.info(s"Starting with solrj SolrJ, running $numUsers queries in parallel") | |
new { | |
val solr = new CloudSolrClient(zk.getConnectString) | |
solr.setDefaultCollection("collection1") | |
def query(q: SolrQuery): Future[QueryResponse] = Future.successful(solr.query(q)) | |
def shutdown() = solr.shutdown() | |
} | |
} else { | |
logger.info(s"Starting with solrs SolrS, running $numUsers queries in parallel") | |
val solrServers = new CloudSolrServers( | |
zk.getConnectString, | |
clusterStateUpdateInterval = 100 millis, | |
defaultCollection = Some("collection1")) | |
// We need to configure a retry policy as otherwise requests fail because server status is not | |
// updated fast enough... | |
AsyncSolrClient.Builder(RoundRobinLB(solrServers)).withRetryPolicy(RetryPolicy.TryAvailableServers).build | |
} | |
} | |
addDocs() | |
val query = new SolrQuery() | |
query.setQuery("cat:cat1") | |
val ex = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numUsers)) | |
import scala.concurrent.ExecutionContext.Implicits.global | |
// some warmup | |
logger.info("Warming up...") | |
await((1 to numWarmupQueries).foldLeft(Future.successful[QueryResponse](null)) { (res, i) => | |
res.flatMap(_ => submitQuery()) | |
}) | |
// some time, perhaps JIT wants to do s.th.... | |
Thread.sleep(1000) | |
logger.info("Starting benchmark...") | |
val start = System.currentTimeMillis() | |
for(i <- 1 to numUsers) { | |
ex.submit(new Runnable { | |
override def run(): Unit = Await.ready(loopQuery(), Duration.Inf) | |
}) | |
} | |
def loopQuery(): Future[QueryResponse] = { | |
submitQuery().flatMap { resp => | |
if(running.get()) { | |
loopQuery() | |
} else { | |
Future.successful(resp) | |
} | |
} | |
} | |
def submitQuery(): Future[QueryResponse] = { | |
val futureResponse = solrClient.query(query) | |
futureResponse.onComplete { | |
case Success(resp) => | |
assert(resp.getResults.getNumFound == 2) | |
numSuccess.incrementAndGet() | |
case Failure(e) => | |
logger.error(s"Query failed.", e) | |
numFailure.incrementAndGet() | |
} | |
futureResponse | |
} | |
private def addDocs(): Unit = { | |
val solr = new CloudSolrClient(zk.getConnectString) | |
solr.setDefaultCollection("collection1") | |
val doc1 = newInputDoc("id1", "doc1", "cat1", 10) | |
val doc2 = newInputDoc("id2", "doc2", "cat1", 20) | |
solr.add(asList(doc1, doc2)) | |
solr.commit() | |
solr.shutdown() | |
} | |
private def await(f: Future[_]) = Await.ready(f, Duration.Inf) | |
logger.info("Press enter to stop...") | |
Console.readLine() | |
//Thread.sleep(10000L) | |
running.set(false) | |
val duration = System.currentTimeMillis() - start | |
logger.info(s"Success rate: ${numSuccess.get() / (duration / 1000)} req/sec") | |
logger.info(s"Error rate: ${numFailure.get() / (duration / 1000)} req/sec") | |
ex.shutdown() | |
ex.awaitTermination(5, TimeUnit.SECONDS) | |
solrClient.shutdown() | |
solrRunners.foreach(_.stop()) | |
zk.close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment