Skip to content

Instantly share code, notes, and snippets.

@jamiekt
Last active May 24, 2018 17:48
Show Gist options
  • Save jamiekt/cea2dab3ea8de91489b31045b302e011 to your computer and use it in GitHub Desktop.
Save jamiekt/cea2dab3ea8de91489b31045b302e011 to your computer and use it in GitHub Desktop.
Scala Spark demo of joining multiple dataframes on same columns using implicit classes. git clone then run using `sbt run`
project
target
metastore_db
derby.log
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.0"
libraryDependencies += "org.apache.spark" %% "spark-hive" % "2.3.0"
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.time.LocalDate
object DataFrameExtensions_ {
implicit class DataFrameExtensions(df: DataFrame){
def featuresGroup1(groupBy: Seq[String], asAt: LocalDate): DataFrame = {df}
def featuresGroup2(groupBy: Seq[String], asAt: LocalDate): DataFrame = {df}
}
implicit class SequenceOfDataFrames(seqDF: Seq[DataFrame]){
def joinDataFramesOnColumns(joinColumns: Seq[String]) : DataFrame = {
if (joinColumns.isEmpty) {
return seqDF.reduce(_.crossJoin(_))
}
seqDF.reduce(_.join(_, joinColumns))
}
}
}
object Application extends App{
System.setSecurityManager(null)
import DataFrameExtensions_._
val spark = SparkSession.builder().config(new SparkConf().setMaster("local[*]")).enableHiveSupport().getOrCreate()
import spark.implicits._
val df = Seq((8, "bat"),(64, "mouse"),(-27, "horse")).toDF("number", "word")
val groupBy = Seq("number", "word")
val asAt = LocalDate.now()
val dataFrames = Seq(df.featuresGroup1(_,_), df.featuresGroup2(_,_)).map(_(groupBy, asAt))
//Another way of achieving the same:
//val funcs:List[DataFrame=>(Seq[String], java.time.LocalDate) => org.apache.spark.sql.DataFrame] = List(_.featuresGroup1, _.featuresGroup2)
//val dataFrames = funcs.map(x => x(df)(groupBy, asAt))
dataFrames.joinDataFramesOnColumns(groupBy).show
spark.stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment