Created
September 24, 2014 16:08
-
-
Save beiske/c784ed0cd5247aa58e07 to your computer and use it in GitHub Desktop.
BadMappingsDemo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.file.Files | |
import scala.concurrent.duration._ | |
import java.nio.file.Paths | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.ExecutionContext | |
import org.elasticsearch.client.transport.TransportClient | |
import org.elasticsearch.common.settings.ImmutableSettings | |
import org.elasticsearch.common.transport.InetSocketTransportAddress | |
import com.sksamuel.elastic4s.ElasticClient | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import scala.concurrent.ExecutionContext | |
import java.util.zip.ZipEntry | |
import scala.concurrent.Future | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.duration.Duration.Infinite | |
import java.util.concurrent.ThreadPoolExecutor | |
import java.util.concurrent.Executors | |
import org.elasticsearch.action.bulk.BulkResponse | |
import com.sksamuel.elastic4s.SearchType | |
import com.sksamuel.elastic4s.SearchType | |
import org.elasticsearch.search.aggregations.bucket.terms.Terms | |
import org.elasticsearch.search.aggregations.Aggregation | |
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator.SingleValue | |
import org.elasticsearch.search.aggregations.metrics.sum.Sum | |
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order | |
import scala.concurrent.Promise | |
import java.util.Calendar | |
import java.util.Date | |
import org.elasticsearch.common.joda.time.DateTime | |
object BadMappingsIndexer extends App { | |
val bulkSize = 1000 | |
val settings = ImmutableSettings.settingsBuilder() | |
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule") | |
.put("transport.found.api-key", "<api-key>") | |
.put("cluster.name", "<cluster_name>") | |
.put("client.transport.ignore_cluster_name", false) | |
.put("client.transport.nodes_sampler_interval", "30s") | |
.put("client.transport.ping_timeout", "30s") | |
.build(); | |
val client = { | |
val address = new InetSocketTransportAddress("<cluster_name>-eu-west-1.foundcluster.com", 9343); | |
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address)) | |
} | |
implicit val timeout = 10 minutes | |
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2)) | |
def push() = { | |
println("Waiting for clusterstatus yellow.") | |
val clusterStatus = client.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name() | |
println(s"Got clusterstatus $clusterStatus") | |
if (clusterStatus != "YELLOW" & clusterStatus != "GREEN") { | |
throw new Error("Bad cluster status") | |
} | |
val bulkLoads = (1 to 1000000).grouped(bulkSize).map { | |
batch => | |
println("Creating bulk") | |
client.bulk( | |
batch.map(i => { | |
index into "test" -> "sometype" fields { | |
i.toString -> i*i | |
} | |
}).toSeq: _*) | |
}.toStream | |
val concurrentBulks = 2 | |
def onBulkSuccess(bulk: Future[BulkResponse]) { | |
bulk.onSuccess { | |
case bulkResult => { | |
if (bulkResult.hasFailures()) { | |
for (result <- bulkResult.getItems()) { | |
if (result.isFailed()) { | |
println(s"Failed response: [${result.getFailure()}][${result.getFailureMessage()}]") | |
} | |
} | |
} else { | |
println("Bulkresult successful!") | |
} | |
} | |
} | |
} | |
bulkLoads.take(concurrentBulks - 1).foreach(onBulkSuccess) | |
for (activeBulks <- bulkLoads.sliding(concurrentBulks)) { | |
onBulkSuccess(activeBulks.last) | |
activeBulks.head.await | |
} | |
Future.sequence(bulkLoads) | |
} | |
val allBulks = push() | |
val failureCount = (allBulks.map(_.map(_.asScala.count(_.isFailed())))).map(_.sum) | |
failureCount.onSuccess { | |
case count: Int => { | |
println(s"$count failed to index") | |
System.exit(count) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Please provide the build.sbt or pom.xml file for above as I am having issues with elastic4s and elastic artifacts clashing. Thanks.