Last active
June 13, 2016 17:43
-
-
Save johntbush/a75836f536cba4c0d83ae9cea993c3b5 to your computer and use it in GitHub Desktop.
scala scripting
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env scalas | |
/*** | |
scalaVersion := "2.11.7" | |
resolvers += Resolver.url("typesafe-ivy-repo", url("http://repo.typesafe.com/typesafe/releases"))(Resolver.ivyStylePatterns) | |
resolvers += "Your Artifactory" at "http://yourstuff.com/artifactory/repo" | |
resolvers += "mandubian maven bintray" at "http://dl.bintray.com/mandubian/maven" | |
resolvers += Resolver.mavenLocal | |
libraryDependencies += "org.scala-sbt" % "io" % "0.13.11" | |
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.7" | |
libraryDependencies += "ch.qos.logback" % "logback-core" % "1.1.7" | |
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.21" | |
libraryDependencies += "com.trax.platform" % "trax-elasticsearch-loader" % "1.3.43" | |
libraryDependencies += "com.trax.platform" % "trax-platform-utils" % "1.3.7" | |
*/ | |
import com.sksamuel.elastic4s.ElasticDsl._ | |
import com.sksamuel.elastic4s.{ElasticClient, SearchType} | |
import com.trax.common.log.LogbackUtils | |
import com.trax.elasticsearch.EsClient | |
import com.trax.platform.util.scala.RestUtils | |
import com.typesafe.scalalogging.{Logger, LazyLogging} | |
import org.json4s.JsonDSL._ | |
import org.json4s.jackson.JsonMethods._ | |
import org.slf4j.LoggerFactory | |
import scala.annotation.tailrec | |
import scala.collection.JavaConversions._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.duration._ | |
import scala.concurrent.{Await, Future} | |
LogbackUtils.setLogLevel(null, "INFO") | |
val logger = Logger(LoggerFactory.getLogger("log")) | |
@tailrec | |
def next(client: ElasticClient, scrollId: String, total:Int): Int = { | |
val resp = client.execute { search scroll scrollId keepAlive "1m" }.await | |
if (resp.getHits.getHits.isEmpty) total | |
else { | |
val futures = resp.getHits.getHits.map { hit => | |
val msg = compact(render(("env_id" -> hit.field("Environment.EnvId").getValues.head.toString) ~ ("action" -> "index") ~ ("type" -> "fb") ~ ("id" -> hit.id))) | |
logger.debug(msg) | |
Future { | |
RestUtils.postRestContent("http://someserver/sendmessage/invoice_search", msg, "application/json") | |
} | |
}.toSeq | |
Await.result(Future.sequence(futures), 1.hour) | |
next(client, resp.getScrollId, total + futures.size) | |
} | |
} | |
val client = EsClient.getInstance("somecluster",9300,"cluster_name").client | |
val resp = client.execute { | |
search in "fps-fbs*" / "fb" size(1000) fields("FbNorm.Businessflow","Environment.EnvId") query wildcardQuery("FbNorm.Businessflow","*unknown") searchType SearchType.Scan scroll "1m" | |
}.await | |
logger.info(s"${resp.getHits.getTotalHits} hits") | |
val docs = next(client, resp.getScrollId, 0) | |
logger.info(s"fixed ${docs} docs") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment