Skip to content

Instantly share code, notes, and snippets.

@sujithjay
Last active March 25, 2020 23:31
Show Gist options
  • Save sujithjay/17cf930ae9b9b135dc95457f5b8807a7 to your computer and use it in GitHub Desktop.
Save sujithjay/17cf930ae9b9b135dc95457f5b8807a7 to your computer and use it in GitHub Desktop.
DataFrameNaFunctions.fill in Spark 2.4.5 & Spark 2.4.3
/**
* Returns a new `DataFrame` that replaces null or NaN values in specified
* numeric, string columns. If a specified column is not a numeric, string
* or boolean column it is ignored.
*/
private def fillValue[T](value: T, cols: Seq[String]): DataFrame = {
// the fill[T] which T is Long/Double,
// should apply on all the NumericType Column, for example:
// val input = Seq[(java.lang.Integer, java.lang.Double)]((null, 164.3)).toDF("a","b")
// input.na.fill(3.1)
// the result is (3,164.3), not (null, 164.3)
val targetType = value match {
case _: Double | _: Long => NumericType
case _: String => StringType
case _: Boolean => BooleanType
case _ => throw new IllegalArgumentException(
s"Unsupported value type ${value.getClass.getName} ($value).")
}
val columnEquals = df.sparkSession.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
val typeMatches = (targetType, f.dataType) match {
case (NumericType, dt) => dt.isInstanceOf[NumericType]
case (StringType, dt) => dt == StringType
case (BooleanType, dt) => dt == BooleanType
case _ =>
throw new IllegalArgumentException(s"$targetType is not matched at fillValue")
}
// Only fill if the column is part of the cols list.
if (typeMatches && cols.exists(col => columnEquals(f.name, col))) {
fillCol[T](f, value)
} else {
df.col(f.name)
}
}
df.select(projections : _*)
}
/**
* Returns a new `DataFrame` that replaces null or NaN values in the specified
* columns. If a specified column is not a numeric, string or boolean column,
* it is ignored.
*/
private def fillValue[T](value: T, cols: Seq[Attribute]): DataFrame = {
// the fill[T] which T is Long/Double,
// should apply on all the NumericType Column, for example:
// val input = Seq[(java.lang.Integer, java.lang.Double)]((null, 164.3)).toDF("a","b")
// input.na.fill(3.1)
// the result is (3,164.3), not (null, 164.3)
val targetType = value match {
case _: Double | _: Long => NumericType
case _: String => StringType
case _: Boolean => BooleanType
case _ => throw new IllegalArgumentException(
s"Unsupported value type ${value.getClass.getName} ($value).")
}
val projections = outputAttributes.map { col =>
val typeMatches = (targetType, col.dataType) match {
case (NumericType, dt) => dt.isInstanceOf[NumericType]
case (StringType, dt) => dt == StringType
case (BooleanType, dt) => dt == BooleanType
case _ =>
throw new IllegalArgumentException(s"$targetType is not matched at fillValue")
}
// Only fill if the column is part of the cols list.
if (typeMatches && cols.exists(_.semanticEquals(col))) {
fillCol(col.dataType, col.name, Column(col), value)
} else {
Column(col)
}
}
df.select(projections : _*)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment