Skip to content

Instantly share code, notes, and snippets.

@johntbush
Last active June 13, 2016 17:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johntbush/a75836f536cba4c0d83ae9cea993c3b5 to your computer and use it in GitHub Desktop.
Save johntbush/a75836f536cba4c0d83ae9cea993c3b5 to your computer and use it in GitHub Desktop.
scala scripting
#!/usr/bin/env scalas
/***
scalaVersion := "2.11.7"
resolvers += Resolver.url("typesafe-ivy-repo", url("http://repo.typesafe.com/typesafe/releases"))(Resolver.ivyStylePatterns)
resolvers += "Your Artifactory" at "http://yourstuff.com/artifactory/repo"
resolvers += "mandubian maven bintray" at "http://dl.bintray.com/mandubian/maven"
resolvers += Resolver.mavenLocal
libraryDependencies += "org.scala-sbt" % "io" % "0.13.11"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.7"
libraryDependencies += "ch.qos.logback" % "logback-core" % "1.1.7"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.21"
libraryDependencies += "com.trax.platform" % "trax-elasticsearch-loader" % "1.3.43"
libraryDependencies += "com.trax.platform" % "trax-platform-utils" % "1.3.7"
*/
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.{ElasticClient, SearchType}
import com.trax.common.log.LogbackUtils
import com.trax.elasticsearch.EsClient
import com.trax.platform.util.scala.RestUtils
import com.typesafe.scalalogging.{Logger, LazyLogging}
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.slf4j.LoggerFactory
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
LogbackUtils.setLogLevel(null, "INFO")
val logger = Logger(LoggerFactory.getLogger("log"))
@tailrec
def next(client: ElasticClient, scrollId: String, total:Int): Int = {
val resp = client.execute { search scroll scrollId keepAlive "1m" }.await
if (resp.getHits.getHits.isEmpty) total
else {
val futures = resp.getHits.getHits.map { hit =>
val msg = compact(render(("env_id" -> hit.field("Environment.EnvId").getValues.head.toString) ~ ("action" -> "index") ~ ("type" -> "fb") ~ ("id" -> hit.id)))
logger.debug(msg)
Future {
RestUtils.postRestContent("http://someserver/sendmessage/invoice_search", msg, "application/json")
}
}.toSeq
Await.result(Future.sequence(futures), 1.hour)
next(client, resp.getScrollId, total + futures.size)
}
}
val client = EsClient.getInstance("somecluster",9300,"cluster_name").client
val resp = client.execute {
search in "fps-fbs*" / "fb" size(1000) fields("FbNorm.Businessflow","Environment.EnvId") query wildcardQuery("FbNorm.Businessflow","*unknown") searchType SearchType.Scan scroll "1m"
}.await
logger.info(s"${resp.getHits.getTotalHits} hits")
val docs = next(client, resp.getScrollId, 0)
logger.info(s"fixed ${docs} docs")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment