Skip to content

Instantly share code, notes, and snippets.

@byF
Last active August 29, 2015 14:05
Show Gist options
  • Save byF/588a7a2153b0a459d9a1 to your computer and use it in GitHub Desktop.
Save byF/588a7a2153b0a459d9a1 to your computer and use it in GitHub Desktop.
Scala script for the aggregation task http://www.aproint.com/aggregation-with-spark-sql
/*
(c) 2014 Zdenek Farana; use as is; no guarantees
Because of the bug in Spark SQL parser, test B works only with fork https://github.com/aproint/spark
*/
def measure(f: => Any):Double = {
val start = System.currentTimeMillis()
f
val end = System.currentTimeMillis()
(end-start)/1000.0
}
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class RandomData(year: Int, dayOfYear: Int, hour: Int, created_on: java.sql.Timestamp, value: Double)
val P = measure{
import java.text.SimpleDateFormat
val sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
import java.util.Calendar
val calendar = Calendar.getInstance();
sc.textFile("randomData.csv").map(_.split(",")).filter(_(0)!="created_on").map { r =>
val date = sdf.parse(r(0))
calendar.setTime(date)
RandomData(calendar.get(Calendar.YEAR),
calendar.get(Calendar.DAY_OF_YEAR),
calendar.get(Calendar.HOUR),
new java.sql.Timestamp(date.getTime),
r(1).toDouble)
}.registerAsTable("RandomData")
sqlContext.cacheTable("RandomData") //could be also done as sqlContext.sql("CACHE TABLE RandomData")
}
val testA = sqlContext.sql("""SELECT year,
dayOfYear,
COUNT(*),
AVG(value),
MIN(value),
MAX(value) FROM RandomData GROUP BY year,dayOfYear ORDER BY year,dayOfYear""")
val A1 = measure(testA.collect)
val A2 = measure(testA.collect)
val testB = sqlContext.sql("""SELECT year,
dayOfYear,
hour,
COUNT(*),
AVG(value),
MIN(value),
MAX(value) FROM RandomData
WHERE created_on >= CAST('2012-07-16 00:00:00' AS TIMESTAMP) AND created_on <= CAST('2012-07-16 01:00:00' AS TIMESTAMP)
GROUP BY year,dayOfYear,hour ORDER BY year,dayOfYear,hour""")
val B1 = measure(testB.collect)
val B2 = measure(testB.collect)
println("""Prep time: %.3fs
A: %.3fs; %.3fs
B: %.3fs; %.3fs""".format(P,A1,A2,B1,B2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment