Skip to content

Instantly share code, notes, and snippets.

@beiske
Created September 24, 2014 16:08
Show Gist options
  • Save beiske/c784ed0cd5247aa58e07 to your computer and use it in GitHub Desktop.
Save beiske/c784ed0cd5247aa58e07 to your computer and use it in GitHub Desktop.
BadMappingsDemo
import java.nio.file.Files
import scala.concurrent.duration._
import java.nio.file.Paths
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import scala.concurrent.ExecutionContext
import java.util.zip.ZipEntry
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.Duration.Infinite
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.Executors
import org.elasticsearch.action.bulk.BulkResponse
import com.sksamuel.elastic4s.SearchType
import com.sksamuel.elastic4s.SearchType
import org.elasticsearch.search.aggregations.bucket.terms.Terms
import org.elasticsearch.search.aggregations.Aggregation
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregator.SingleValue
import org.elasticsearch.search.aggregations.metrics.sum.Sum
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order
import scala.concurrent.Promise
import java.util.Calendar
import java.util.Date
import org.elasticsearch.common.joda.time.DateTime
object BadMappingsIndexer extends App {
val bulkSize = 1000
val settings = ImmutableSettings.settingsBuilder()
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
.put("transport.found.api-key", "<api-key>")
.put("cluster.name", "<cluster_name>")
.put("client.transport.ignore_cluster_name", false)
.put("client.transport.nodes_sampler_interval", "30s")
.put("client.transport.ping_timeout", "30s")
.build();
val client = {
val address = new InetSocketTransportAddress("<cluster_name>-eu-west-1.foundcluster.com", 9343);
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
}
implicit val timeout = 10 minutes
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
def push() = {
println("Waiting for clusterstatus yellow.")
val clusterStatus = client.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name()
println(s"Got clusterstatus $clusterStatus")
if (clusterStatus != "YELLOW" & clusterStatus != "GREEN") {
throw new Error("Bad cluster status")
}
val bulkLoads = (1 to 1000000).grouped(bulkSize).map {
batch =>
println("Creating bulk")
client.bulk(
batch.map(i => {
index into "test" -> "sometype" fields {
i.toString -> i*i
}
}).toSeq: _*)
}.toStream
val concurrentBulks = 2
def onBulkSuccess(bulk: Future[BulkResponse]) {
bulk.onSuccess {
case bulkResult => {
if (bulkResult.hasFailures()) {
for (result <- bulkResult.getItems()) {
if (result.isFailed()) {
println(s"Failed response: [${result.getFailure()}][${result.getFailureMessage()}]")
}
}
} else {
println("Bulkresult successful!")
}
}
}
}
bulkLoads.take(concurrentBulks - 1).foreach(onBulkSuccess)
for (activeBulks <- bulkLoads.sliding(concurrentBulks)) {
onBulkSuccess(activeBulks.last)
activeBulks.head.await
}
Future.sequence(bulkLoads)
}
val allBulks = push()
val failureCount = (allBulks.map(_.map(_.asScala.count(_.isFailed())))).map(_.sum)
failureCount.onSuccess {
case count: Int => {
println(s"$count failed to index")
System.exit(count)
}
}
}
@prateeka
Copy link

Please provide the build.sbt or pom.xml file for above as I am having issues with elastic4s and elastic artifacts clashing. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment