Skip to content

Instantly share code, notes, and snippets.

@jamesrajendran
Last active March 2, 2022 07:29
Show Gist options
  • Save jamesrajendran/5f90b12c0b3ee657dbf6471352f1710f to your computer and use it in GitHub Desktop.
Save jamesrajendran/5f90b12c0b3ee657dbf6471352f1710f to your computer and use it in GitHub Desktop.
spark dataframe examples cookbook read
// Read json
// Explode
//scala version
val testDF = sqlContext.read.json(sc.parallelize(Seq("""{"a":1,"b":[2,3]} """)))
testDF.printSchema
val flattenedDF = testDF.withColumn("b",explode($"b"))
flattenedDF.printSchema
flattenedDF.show
//python version
from pyspark.sql import *
from pyspark.sql.functions import explode
testDF = sqlContext.read.json(sc.parallelize(['{"a":1,"b":[2,3]}']))
testDF.printSchema
flattenedDF = testDF.withColumn("b",explode("b"))
flattenedDF.show()
//combineByKey
type ScoreCollector = (Int, Double)
type PersonScores = (String, (Int, Double))
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
val wilmaAndFredScores = sc.parallelize(initialScores).cache()
val createScoreCombiner = (score: Double) => (1, score)
val scoreCombiner = (collector: ScoreCollector, score: Double) => {
val (numberScores, totalScore) = collector
(numberScores + 1, totalScore + score)
}
val scoreMerger = (collector1: ScoreCollector, collector2: ScoreCollector) => {
val (numScores1, totalScore1) = collector1
val (numScores2, totalScore2) = collector2
(numScores1 + numScores2, totalScore1 + totalScore2)
}
val scores = wilmaAndFredScores.combineByKey(createScoreCombiner, scoreCombiner, scoreMerger)
val averagingFunction = (personScore: PersonScores) => {
val (name, (numberScores, totalScore)) = personScore
(name, totalScore / numberScores)
}
val averageScores = scores.collectAsMap().map(averagingFunction)
println("Average Scores using CombingByKey")
averageScores.foreach((ps) => {
val(name,average) = ps
println(name+ "'s average score : " + average)
})
//aggregateByKey 1
import scala.collection.mutable
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialSet = mutable.HashSet.empty[String]
val addToSet = (s: mutable.HashSet[String], v: String) => s += v
val mergePartitionSets = (p1: mutable.HashSet[String], p2: mutable.HashSet[String]) => p1 ++= p2
val uniqueByKey = kv.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
//aggregateByKey 2
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment