Last active
May 24, 2018 17:48
-
-
Save jamiekt/cea2dab3ea8de91489b31045b302e011 to your computer and use it in GitHub Desktop.
Scala Spark demo of joining multiple dataframes on same columns using implicit classes. git clone then run using `sbt run`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
project | |
target | |
metastore_db | |
derby.log |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
scalaVersion := "2.11.12" | |
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0" | |
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0" | |
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.3.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.{DataFrame, SparkSession} | |
import java.time.LocalDate | |
object DataFrameExtensions_ { | |
implicit class DataFrameExtensions(df: DataFrame){ | |
def featuresGroup1(groupBy: Seq[String], asAt: LocalDate): DataFrame = {df} | |
def featuresGroup2(groupBy: Seq[String], asAt: LocalDate): DataFrame = {df} | |
} | |
implicit class SequenceOfDataFrames(seqDF: Seq[DataFrame]){ | |
def joinDataFramesOnColumns(joinColumns: Seq[String]) : DataFrame = { | |
if (joinColumns.isEmpty) { | |
return seqDF.reduce(_.crossJoin(_)) | |
} | |
seqDF.reduce(_.join(_, joinColumns)) | |
} | |
} | |
} | |
object Application extends App{ | |
System.setSecurityManager(null) | |
import DataFrameExtensions_._ | |
val spark = SparkSession.builder().config(new SparkConf().setMaster("local[*]")).enableHiveSupport().getOrCreate() | |
import spark.implicits._ | |
val df = Seq((8, "bat"),(64, "mouse"),(-27, "horse")).toDF("number", "word") | |
val groupBy = Seq("number", "word") | |
val asAt = LocalDate.now() | |
val dataFrames = Seq(df.featuresGroup1(_,_), df.featuresGroup2(_,_)).map(_(groupBy, asAt)) | |
//Another way of achieving the same: | |
//val funcs:List[DataFrame=>(Seq[String], java.time.LocalDate) => org.apache.spark.sql.DataFrame] = List(_.featuresGroup1, _.featuresGroup2) | |
//val dataFrames = funcs.map(x => x(df)(groupBy, asAt)) | |
dataFrames.joinDataFramesOnColumns(groupBy).show | |
spark.stop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment