Skip to content

Instantly share code, notes, and snippets.

@raymondtay
Last active August 22, 2019 10:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raymondtay/a2000458ff83eee6dad9db8c8f1b4292 to your computer and use it in GitHub Desktop.
Save raymondtay/a2000458ff83eee6dad9db8c8f1b4292 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.DataFrameReader
def configureReaderWithElasticSearch(options: Map[String,String]) : DataFrameReader = {
val fr = spark.read.format("es")
def go(fr: DataFrameReader, optMap: Map[String, String]) =
optMap.foldLeft(fr)((reader, c) => reader.option(c._1, c._2))
go(fr, options)
}
def loadDataFromElasticSearch(index: String)(reader: DataFrameReader) : DataFrame =
reader.load(s"$index/$index")
def loadDataFromES2(cluster: String) : Map[String,String] => DataFrame =
(loadDataFromElasticSearch(cluster) _) compose configureReaderWithElasticSearch
def loadClusters2(issueTypeId: String): DataFrame = {
val esConfigOptions = Map(
ConfigurationOptions.ES_NODES_WAN_ONLY -> Settings.esWanOnly,
ConfigurationOptions.ES_PORT -> Settings.esPort,
ConfigurationOptions.ES_NODES -> Settings.esNodes,
"es.read.metadata" -> "true"
)
val clusters = loadDataFromES2("cluster")(esConfigOptions)
.withColumn("id", $"_metadata._id")
.drop("_metadata", "changeRequestIds", "comments", "rate", "clusterId", "originalId")
.filter($"issueTypeId" === issueTypeId)
addColumnsToDataframe(clusters, joinColumns.toArray)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment