Skip to content

Instantly share code, notes, and snippets.

@kulikov
Created February 6, 2014 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kulikov/8844753 to your computer and use it in GitHub Desktop.
Save kulikov/8844753 to your computer and use it in GitHub Desktop.
val timeout = new TimeValue(600000)
val requestBuilder = client
.prepareSearch(index)
.setTypes(indexType)
.setSize(size.getOrElse(5000))
.setScroll(timeout)
.setSearchType(SearchType.SCAN)
query foreach requestBuilder.setQuery
filter foreach requestBuilder.setFilter
fields map(t => t foreach requestBuilder.addField)
if (fields.exists(_.isEmpty)) requestBuilder.setNoFields()
sort foreach requestBuilder.addSort
log.info("Search query: {}", requestBuilder)
val response = requestBuilder.execute().actionGet()
log.info("Search result: {} total hits", response.getHits.totalHits())
scrollProcess(response, timeout, func)
@annotation.tailrec
final def scrollProcess(scrollResp: SearchResponse, timeout: TimeValue, func: SearchHit ⇒ Unit) {
var hasError = false
val scrollRespNext =
try {
client.prepareSearchScroll(scrollResp.getScrollId)
.setScroll(timeout)
.execute()
.actionGet()
} catch {
case e: Exception ⇒
log.error("Search error: {}", e)
Thread.sleep(500)
hasError = true
scrollResp // try again
}
if (!hasError) {
scrollRespNext.getHits.hits().foreach(func)
}
if (hasError || scrollRespNext.getHits.hits().length != 0) {
scrollProcess(scrollRespNext, timeout, func)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment