Skip to content

Instantly share code, notes, and snippets.

@beiske
Created November 20, 2013 11:04
Show Gist options
  • Select an option

  • Save beiske/7561401 to your computer and use it in GitHub Desktop.

Select an option

Save beiske/7561401 to your computer and use it in GitHub Desktop.
name := "reindexer"
version := "1.0-SNAPSHOT"
scalaVersion := "2.10.2"
libraryDependencies += "no.found.elasticsearch" % "elasticsearch-transport-module" % "0.8.0-0905"
libraryDependencies += "com.sksamuel.elastic4s" % "elastic4s_2.10" % "0.90.5.2"
import scala.collection.immutable.Stream.consWrapper
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.Failure
import scala.util.Success
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.SearchType.Scan
import com.sksamuel.elastic4s.source.Source
object Reindexer extends App {
import scala.concurrent.ExecutionContext.Implicits.global
val settings = ImmutableSettings.settingsBuilder()
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
.put("transport.found.api-key", "<api_key>")
.put("cluster.name", "<cluster_name>")
.put("client.transport.ignore_cluster_name", false)
.build();
val client = {
val address = new InetSocketTransportAddress("<cluster-name>-<region>.foundcluster.com", 9343);
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
}
def getData() = {
val res = client.execute {
search in "<from-index>" searchType Scan scroll "10m" size 500
}
println("Executed first query")
def fetch(previous: Future[SearchResponse]) = {
previous.flatMap {
res =>
{
client.searchScroll(res.getScrollId(), "10m")
}
}
}
def toStream(current: Future[SearchResponse]): Stream[SearchResponse] = {
val result = Await.result(current, Duration.Inf)
if (result.getHits().getHits().length > 0) {
result #:: toStream(fetch(current))
} else {
Stream.empty
}
}
toStream(fetch(res))
}
val sourceStream = getData
def documentCounts(i: Int): Stream[Int] = i #:: documentCounts(i + 500)
val bulkResults =
for {
(result, i) <- sourceStream.zip(documentCounts(0))
val bulkResult = client.bulk(
result.getHits().getHits().map(_.getSourceAsString()).map {
s => index into "<to-index>/<type>" source new StringSource(s)
}: _*)
val _ = bulkResult.onComplete {
case Success(r) => {
if (r.hasFailures()) {
println(s"Indexing failure batch($i): ${r.buildFailureMessage()}")
} else {
println(s"Indexing success batch($i): [${r.getTook()}], [${r.getItems().length}]")
}
}
case Failure(e) => {
println(s"Indexing transport error batch($i): ${e.getLocalizedMessage()}")
}
}
} yield bulkResult
Await.result(Future.sequence(bulkResults), Duration.Inf)
client.close()
println("Done")
class StringSource(override val json: String) extends Source
}
@beiske
Copy link
Copy Markdown
Author

beiske commented Nov 20, 2013

This is a slightly stripped down version of the reindexer created in http://www.found.no/foundation/city-bikes-reindexing-for-filters/. It is also an example of combining the elasticsearch-transport-module with elastic4s.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment