Created
November 15, 2016 09:38
-
-
Save geoHeil/4791773c791bdc1ebd6a2f345b0f0684 to your computer and use it in GitHub Desktop.
Spark trying to get forward fill to work http://stackoverflow.com/questions/40592207/spark-scala-fill-nan-with-last-good-observation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package foo | |
import java.sql.Date | |
import org.apache.log4j.{Level, Logger} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.expressions.WindowSpec | |
import org.apache.spark.sql.functions._ | |
import org.apache.spark.sql.{Column, SparkSession} | |
case class FooBar(foo: Date, bar: String) | |
object Foo extends App { | |
Logger.getLogger("org").setLevel(Level.WARN) | |
val conf: SparkConf = new SparkConf() | |
.setAppName("foo") | |
.setMaster("local[*]") | |
val spark: SparkSession = SparkSession | |
.builder() | |
.config(conf) | |
// .enableHiveSupport() | |
.getOrCreate() | |
import spark.implicits._ | |
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), | |
("2016-wrongFormat", "noValidFormat"), | |
("2016-01-04", "lastAssumingSameDate")) | |
.toDF("foo", "bar") | |
.withColumn("foo", 'foo.cast("Date")) | |
.as[FooBar] | |
myDf.show | |
myDf.printSchema | |
val maxGap: Int = 5 | |
// Maximum gap between observations | |
val columnsToFill: List[String] = ??? | |
// List of columns to fill | |
val suffix: String = "_" // To disambiguate between original and imputed | |
// Take lag 1 to maxGap and coalesce | |
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = { | |
// Generate lag values between 1 and maxGap | |
val lags = (1 to maxGap).map(lag(col(c), _) over (w)) | |
// Add current, coalesce and set alias | |
coalesce(col(c) +: lags: _*).alias(s"$c$suffix") | |
} | |
// For each column you want to fill nulls apply makeCoalesce | |
val lags: List[Column] = columnsToFill.map(makeCoalesce(_)(maxGap)("_")) | |
// Finally select | |
val dfImputed = myDf.select($"*" :: lags: _*) | |
dfImputed.printSchema | |
dfImputed.show | |
spark.stop | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Trying to get forward filling in spark to work. Here is my question: http://stackoverflow.com/questions/40592207/spark-scala-fill-nan-with-last-good-observation
I am following along with http://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation but so far could not get it to work