Skip to content

Instantly share code, notes, and snippets.

View wangjingke's full-sized avatar

Wangjing Ke wangjingke

View GitHub Profile
@wangjingke
wangjingke / sparkHelpers.scala
Created April 24, 2018 17:39
some functions helping import data from Hadoop to spark
object sparkHelpers extends Serializable {
// function to ingest from Hadoop and convert to Spark dataframe
def readHadoopToSparkDF(sc: org.apache.spark.SparkContext, sqlContext: org.apache.spark.sql.SQLContext, hdfs_path: String, schema: List[org.apache.spark.sql.types.DataType], sep: String = "\t", cols: Array[String] = Array()): org.apache.spark.sql.DataFrame = {
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val rdd = sc.textFile(hdfs_path)
val header = if (cols.length == 0) rdd.first.split(sep).map(_.trim) else cols
val body = if (cols.length == 0) rdd.filter(row => row != header) else rdd
val df_schema_list = (header, schema, List.fill(schema.length)(true)).zipped.toList
You can use select with varargs including *:
import spark.implicits._
df.select($"*" +: Seq("A", "B", "C").map(c =>
sum(c).over(Window.partitionBy("ID").orderBy("time")).alias(s"cum$c")
): _*)
This:
Maps columns names to window expressions with Seq("A", ...).map(...)
package eval
import scala.reflect.runtime.currentMirror
import scala.tools.reflect.ToolBox
import java.io.File
object Eval {
def apply[A](string: String): A = {
val toolbox = currentMirror.mkToolBox()