Skip to content

Instantly share code, notes, and snippets.

@ppat
Last active August 29, 2015 13:56
Show Gist options
  • Save ppat/8828653 to your computer and use it in GitHub Desktop.
Save ppat/8828653 to your computer and use it in GitHub Desktop.
def addToIndex(docs: Iterable[EsDoc]): Unit = {
for (doc <- docs) {
client.execute {
index into resource fields (doc.data)
}
}
client.admin.cluster.prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet
refresh(indexName)
blockUntilCount(docs.size, indexName)
client.admin.cluster.prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet
}
def resource: String = indexName + "/" + docType
private def refresh(indexes: String*) {
val i = indexes.size match {
case 0 => Seq("_all")
case _ => indexes
}
val listener = client.client.admin().indices().prepareRefresh(i: _*).execute()
listener.actionGet()
}
private def blockUntilCount(expected: Long,
index: String,
types: String*) {
var backoff = 0
var actual = 0l
while (backoff <= 64 && actual != expected) {
if (backoff > 0)
Thread.sleep(backoff * 100)
backoff = if (backoff == 0) 1 else backoff * 2
try {
actual = Await.result(client execute {
count from index types types
}, 5 seconds).getCount
} catch {
case e: IndexMissingException => 0
}
}
require(expected == actual, s"Block failed waiting on count: Expected was $expected but actual was $actual")
}
private def createIndex(): Unit = {
client.sync.execute {
create index indexName shards 1 replicas 0 mappings mappingsDef
}
client.admin.indices().prepareExists(indexName).execute().actionGet().isExists should be(true)
client.admin.indices().prepareTypesExists(indexName).setTypes(docType).execute().actionGet().isExists should be(true)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment