Created
November 20, 2013 11:04
-
-
Save beiske/7561401 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| name := "reindexer" | |
| version := "1.0-SNAPSHOT" | |
| scalaVersion := "2.10.2" | |
| libraryDependencies += "no.found.elasticsearch" % "elasticsearch-transport-module" % "0.8.0-0905" | |
| libraryDependencies += "com.sksamuel.elastic4s" % "elastic4s_2.10" % "0.90.5.2" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scala.collection.immutable.Stream.consWrapper | |
| import scala.concurrent.Await | |
| import scala.concurrent.ExecutionContext.Implicits.global | |
| import scala.concurrent.Future | |
| import scala.concurrent.duration.Duration | |
| import scala.util.Failure | |
| import scala.util.Success | |
| import org.elasticsearch.action.search.SearchResponse | |
| import org.elasticsearch.client.transport.TransportClient | |
| import org.elasticsearch.common.settings.ImmutableSettings | |
| import org.elasticsearch.common.transport.InetSocketTransportAddress | |
| import com.sksamuel.elastic4s.ElasticClient | |
| import com.sksamuel.elastic4s.ElasticDsl._ | |
| import com.sksamuel.elastic4s.SearchType.Scan | |
| import com.sksamuel.elastic4s.source.Source | |
| object Reindexer extends App { | |
| import scala.concurrent.ExecutionContext.Implicits.global | |
| val settings = ImmutableSettings.settingsBuilder() | |
| .put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule") | |
| .put("transport.found.api-key", "<api_key>") | |
| .put("cluster.name", "<cluster_name>") | |
| .put("client.transport.ignore_cluster_name", false) | |
| .build(); | |
| val client = { | |
| val address = new InetSocketTransportAddress("<cluster-name>-<region>.foundcluster.com", 9343); | |
| ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address)) | |
| } | |
| def getData() = { | |
| val res = client.execute { | |
| search in "<from-index>" searchType Scan scroll "10m" size 500 | |
| } | |
| println("Executed first query") | |
| def fetch(previous: Future[SearchResponse]) = { | |
| previous.flatMap { | |
| res => | |
| { | |
| client.searchScroll(res.getScrollId(), "10m") | |
| } | |
| } | |
| } | |
| def toStream(current: Future[SearchResponse]): Stream[SearchResponse] = { | |
| val result = Await.result(current, Duration.Inf) | |
| if (result.getHits().getHits().length > 0) { | |
| result #:: toStream(fetch(current)) | |
| } else { | |
| Stream.empty | |
| } | |
| } | |
| toStream(fetch(res)) | |
| } | |
| val sourceStream = getData | |
| def documentCounts(i: Int): Stream[Int] = i #:: documentCounts(i + 500) | |
| val bulkResults = | |
| for { | |
| (result, i) <- sourceStream.zip(documentCounts(0)) | |
| val bulkResult = client.bulk( | |
| result.getHits().getHits().map(_.getSourceAsString()).map { | |
| s => index into "<to-index>/<type>" source new StringSource(s) | |
| }: _*) | |
| val _ = bulkResult.onComplete { | |
| case Success(r) => { | |
| if (r.hasFailures()) { | |
| println(s"Indexing failure batch($i): ${r.buildFailureMessage()}") | |
| } else { | |
| println(s"Indexing success batch($i): [${r.getTook()}], [${r.getItems().length}]") | |
| } | |
| } | |
| case Failure(e) => { | |
| println(s"Indexing transport error batch($i): ${e.getLocalizedMessage()}") | |
| } | |
| } | |
| } yield bulkResult | |
| Await.result(Future.sequence(bulkResults), Duration.Inf) | |
| client.close() | |
| println("Done") | |
| class StringSource(override val json: String) extends Source | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a slightly stripped down version of the reindexer created in http://www.found.no/foundation/city-bikes-reindexing-for-filters/. It is also an example of combining the elasticsearch-transport-module with elastic4s.