Skip to content

Instantly share code, notes, and snippets.

@geoHeil
Created November 15, 2016 09:38
Show Gist options
  • Save geoHeil/4791773c791bdc1ebd6a2f345b0f0684 to your computer and use it in GitHub Desktop.
Save geoHeil/4791773c791bdc1ebd6a2f345b0f0684 to your computer and use it in GitHub Desktop.
package foo
import java.sql.Date
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.WindowSpec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, SparkSession}
case class FooBar(foo: Date, bar: String)
object Foo extends App {
Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.setAppName("foo")
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
// .enableHiveSupport()
.getOrCreate()
import spark.implicits._
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
("2016-wrongFormat", "noValidFormat"),
("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
myDf.show
myDf.printSchema
val maxGap: Int = 5
// Maximum gap between observations
val columnsToFill: List[String] = ???
// List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed
// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
// Generate lag values between 1 and maxGap
val lags = (1 to maxGap).map(lag(col(c), _) over (w))
// Add current, coalesce and set alias
coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}
// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(_)(maxGap)("_"))
// Finally select
val dfImputed = myDf.select($"*" :: lags: _*)
dfImputed.printSchema
dfImputed.show
spark.stop
}
@geoHeil
Copy link
Author

geoHeil commented Nov 15, 2016

Trying to get forward filling in spark to work. Here is my question: http://stackoverflow.com/questions/40592207/spark-scala-fill-nan-with-last-good-observation
I am following along with http://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation but so far could not get it to work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment