This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object sparkHelpers extends Serializable { | |
// function to ingest from Hadoop and convert to Spark dataframe | |
def readHadoopToSparkDF(sc: org.apache.spark.SparkContext, sqlContext: org.apache.spark.sql.SQLContext, hdfs_path: String, schema: List[org.apache.spark.sql.types.DataType], sep: String = "\t", cols: Array[String] = Array()): org.apache.spark.sql.DataFrame = { | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.types._ | |
val rdd = sc.textFile(hdfs_path) | |
val header = if (cols.length == 0) rdd.first.split(sep).map(_.trim) else cols | |
val body = if (cols.length == 0) rdd.filter(row => row != header) else rdd | |
val df_schema_list = (header, schema, List.fill(schema.length)(true)).zipped.toList |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
You can use select with varargs including *: | |
import spark.implicits._ | |
df.select($"*" +: Seq("A", "B", "C").map(c => | |
sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c") | |
): _*) | |
This: | |
Maps columns names to window expressions with Seq("A", ...).map(...) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eval | |
import scala.reflect.runtime.currentMirror | |
import scala.tools.reflect.ToolBox | |
import java.io.File | |
object Eval { | |
def apply[A](string: String): A = { | |
val toolbox = currentMirror.mkToolBox() |