Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wangjingke/877745d84e383a9661c8f37d65764249 to your computer and use it in GitHub Desktop.
Save wangjingke/877745d84e383a9661c8f37d65764249 to your computer and use it in GitHub Desktop.
You can use select with varargs including *:
import spark.implicits._
df.select($"*" +: Seq("A", "B", "C").map(c =>
sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
): _*)
This:
Maps columns names to window expressions with Seq("A", ...).map(...)
Prepends all pre-existing columns with $"*" +: ....
Unpacks combined sequence with ... : _*.
and can be generalize as:
import org.apache.spark.sql.{Column, DataFrame}
/**
* @param cols a sequence of columns to transform
* @param df an input DataFrame
* @param f a function to be applied on each col in cols
*/
def withColumns(cols: Seq[String], df: DataFrame, f: String => Column) =
df.select($"*" +: cols.map(c => f(c)): _*)
If you find withColumn syntax more readable you can use foldLeft:
Seq("A", "B", "C").foldLeft(df)((df, c) =>
df.withColumn(s"cum$c", sum(c).over(Window.partitionBy("ID").orderBy("time")))
)
which can be generalized for example to:
/**
* @param cols a sequence of columns to transform
* @param df an input DataFrame
* @param f a function to be applied on each col in cols
* @param name a function mapping from input to output name.
*/
def withColumns(cols: Seq[String], df: DataFrame,
f: String => Column, name: String => String = identity) =
cols.foldLeft(df)((df, c) => df.withColumn(name(c), f(c)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment