Skip to content

Instantly share code, notes, and snippets.

@magro
Last active August 29, 2015 14:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save magro/42c32863178bc9cbc126 to your computer and use it in GitHub Desktop.
Save magro/42c32863178bc9cbc126 to your computer and use it in GitHub Desktop.
A benchmark for solrs, allows to compare it with solrj. Can be copied in the solrs project to `src/test/scala/io/ino/solrs/Benchmark.scala`, build.sbt additionally needs `connectInput in (Test,run) := true`. Can be run from within sbt via `test:runMain io.ino.solrs.Benchmark 4 solrj` (or alternatively "solrs").
package io.ino.solrs
import java.util.Arrays._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Executors, TimeUnit}
import io.ino.solrs.SolrUtils._
import org.apache.curator.test.TestingServer
import org.apache.solr.client.solrj.SolrQuery
import org.apache.solr.client.solrj.impl.CloudSolrClient
import org.apache.solr.client.solrj.response.QueryResponse
import org.slf4j.LoggerFactory
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.{postfixOps, reflectiveCalls}
import scala.util.{Failure, Success, Try}
object Benchmark extends App {
val logger = LoggerFactory.getLogger(getClass)
val defaultNumUsers = 1
val numWarmupQueries = 2000
def numUsers: Int = if(args.nonEmpty) Try(args(0).toInt).getOrElse(defaultNumUsers) else defaultNumUsers
def useSolrJ: Boolean = args.length == 1 && args(0) == "solrj" || args.length > 1 && args(1) == "solrj"
val numSuccess = new AtomicInteger(0)
val numFailure = new AtomicInteger(0)
val running = new AtomicBoolean(true)
val zk = new TestingServer()
zk.start()
val solrRunners = List(
SolrRunner.start(18888, Some(ZooKeeperOptions(zk.getConnectString, bootstrapConfig = Some("collection1")))),
SolrRunner.start(18889, Some(ZooKeeperOptions(zk.getConnectString)))
)
lazy val solrClient: {
def shutdown(): Unit
def query(q: SolrQuery): Future[QueryResponse]
} = {
if(useSolrJ) {
logger.info(s"Starting with solrj SolrJ, running $numUsers queries in parallel")
new {
val solr = new CloudSolrClient(zk.getConnectString)
solr.setDefaultCollection("collection1")
def query(q: SolrQuery): Future[QueryResponse] = Future.successful(solr.query(q))
def shutdown() = solr.shutdown()
}
} else {
logger.info(s"Starting with solrs SolrS, running $numUsers queries in parallel")
val solrServers = new CloudSolrServers(
zk.getConnectString,
clusterStateUpdateInterval = 100 millis,
defaultCollection = Some("collection1"))
// We need to configure a retry policy as otherwise requests fail because server status is not
// updated fast enough...
AsyncSolrClient.Builder(RoundRobinLB(solrServers)).withRetryPolicy(RetryPolicy.TryAvailableServers).build
}
}
addDocs()
val query = new SolrQuery()
query.setQuery("cat:cat1")
val ex = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(numUsers))
import scala.concurrent.ExecutionContext.Implicits.global
// some warmup
logger.info("Warming up...")
await((1 to numWarmupQueries).foldLeft(Future.successful[QueryResponse](null)) { (res, i) =>
res.flatMap(_ => submitQuery())
})
// some time, perhaps JIT wants to do s.th....
Thread.sleep(1000)
logger.info("Starting benchmark...")
val start = System.currentTimeMillis()
for(i <- 1 to numUsers) {
ex.submit(new Runnable {
override def run(): Unit = Await.ready(loopQuery(), Duration.Inf)
})
}
def loopQuery(): Future[QueryResponse] = {
submitQuery().flatMap { resp =>
if(running.get()) {
loopQuery()
} else {
Future.successful(resp)
}
}
}
def submitQuery(): Future[QueryResponse] = {
val futureResponse = solrClient.query(query)
futureResponse.onComplete {
case Success(resp) =>
assert(resp.getResults.getNumFound == 2)
numSuccess.incrementAndGet()
case Failure(e) =>
logger.error(s"Query failed.", e)
numFailure.incrementAndGet()
}
futureResponse
}
private def addDocs(): Unit = {
val solr = new CloudSolrClient(zk.getConnectString)
solr.setDefaultCollection("collection1")
val doc1 = newInputDoc("id1", "doc1", "cat1", 10)
val doc2 = newInputDoc("id2", "doc2", "cat1", 20)
solr.add(asList(doc1, doc2))
solr.commit()
solr.shutdown()
}
private def await(f: Future[_]) = Await.ready(f, Duration.Inf)
logger.info("Press enter to stop...")
Console.readLine()
//Thread.sleep(10000L)
running.set(false)
val duration = System.currentTimeMillis() - start
logger.info(s"Success rate: ${numSuccess.get() / (duration / 1000)} req/sec")
logger.info(s"Error rate: ${numFailure.get() / (duration / 1000)} req/sec")
ex.shutdown()
ex.awaitTermination(5, TimeUnit.SECONDS)
solrClient.shutdown()
solrRunners.foreach(_.stop())
zk.close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment