Skip to content

Instantly share code, notes, and snippets.

@sadikovi
Last active May 13, 2017 10:25
Show Gist options
  • Save sadikovi/a667406540aeac982faf890dbad3e832 to your computer and use it in GitHub Desktop.
Save sadikovi/a667406540aeac982faf890dbad3e832 to your computer and use it in GitHub Desktop.
Riff vs Parquet initial numbers
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.github.sadikovi.spark.riff._
def row(i: Int): Row = {
Row(i, i, i.toLong, i.toLong, s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i")
}
val schema = StructType(
StructField("col1", IntegerType) ::
StructField("col2", IntegerType) ::
StructField("col3", LongType) ::
StructField("col4", LongType) ::
StructField("col5", StringType) ::
StructField("col6", StringType) ::
StructField("col7", StringType) ::
StructField("col8", StringType) :: Nil)
val df = spark.createDataFrame(sc.parallelize(0 until 100000000, 1000).map(row), schema)
spark.conf.set("spark.sql.parquet.compression.codec", "gzip")
df.write.parquet("./temp/parquet-test")
// 17/05/11 08:57:00 INFO DAGScheduler: Job 0 finished: parquet at <console>:38, took 290.750990 s
// 1.9G temp/parquet-test/
spark.conf.set("spark.sql.riff.compression.codec", "gzip")
spark.conf.set("spark.sql.riff.column.filter.enabled", "true")
df.write.option("index", "col1,col3,col5").riff("./temp/riff-test")
// 17/05/11 09:01:35 INFO DAGScheduler: Job 0 finished: save at package.scala:42, took 198.533308 s
// 1.8G temp/riff-test/
// == Query test ==
spark.read.parquet("./temp/parquet-test").filter("col1 = 5290").show()
/*
17/05/11 09:03:16 INFO DAGScheduler: Job 9 finished: show at <console>:24, took 0.311518 s
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|col1|col2|col3|col4| col5| col6| col7| col8|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
*/
spark.read.riff("./temp/riff-test").filter("col1 = 5290").show()
/*
17/05/11 09:04:56 INFO DAGScheduler: Job 17 finished: show at <console>:27, took 1.778238 s
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|col1|col3| col5|col2|col4| col6| col7| col8|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
*/
spark.read.parquet("./temp/parquet-test").filter("col5 = 'abc5290 abc5290 abc5290'").show()
/*
17/05/11 09:05:28 INFO DAGScheduler: Job 22 finished: show at <console>:27, took 5.184153 s
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|col1|col2|col3|col4| col5| col6| col7| col8|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
*/
spark.read.riff("./temp/riff-test").filter("col5 = 'abc5290 abc5290 abc5290'").show()
/*
17/05/11 09:05:59 INFO DAGScheduler: Job 26 finished: show at <console>:27, took 1.806723 s
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|col1|col3| col5|col2|col4| col6| col7| col8|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
*/
// == Range scan ==
spark.read.parquet("./temp/parquet-test").filter("col3 between 100 and 100000").foreach(_ => Unit)
// 17/05/11 09:06:31 INFO DAGScheduler: Job 28 finished: foreach at <console>:27, took 0.649757 s
spark.read.riff("./temp/riff-test").filter("col3 between 100 and 100000").foreach(_ => Unit)
// 17/05/11 09:06:53 INFO DAGScheduler: Job 29 finished: foreach at <console>:27, took 3.759603 s
spark.read.parquet("./temp/parquet-test").filter("col5 > 'abc9999'").foreach(_ => Unit)
// 17/05/11 09:07:24 INFO DAGScheduler: Job 31 finished: foreach at <console>:27, took 11.980590 s
spark.read.riff("./temp/riff-test").filter("col5 > 'abc9999'").foreach(_ => Unit)
// 17/05/11 09:07:55 INFO DAGScheduler: Job 32 finished: foreach at <console>:27, took 3.698204 s
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.github.sadikovi.spark.riff._
def row(i: Int): Row = {
Row(i, i, i.toLong, i.toLong, s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i")
}
val schema = StructType(
StructField("col1", IntegerType) ::
StructField("col2", IntegerType) ::
StructField("col3", LongType) ::
StructField("col4", LongType) ::
StructField("col5", StringType) ::
StructField("col6", StringType) ::
StructField("col7", StringType) ::
StructField("col8", StringType) :: Nil)
val df = spark.createDataFrame(sc.parallelize(0 until 100000000, 1000).map(row), schema).repartition(1000)
// now we scan records that are repartitioned, most of the statistics are shuffled
df.write.parquet("./temp/parquet-test-large")
// 17/05/11 09:26:44 INFO DAGScheduler: Job 0 finished: parquet at <console>:38, took 185.521304 s
// 4.6G temp/parquet-test-large/ (snappy)
// now we scan records that are repartitioned, most of the statistics are shuffled
spark.conf.set("spark.sql.riff.column.filter.enabled", "true")
df.write.option("index", "col1,col3,col5").riff("./temp/riff-test-large")
// 17/05/11 09:31:23 INFO DAGScheduler: Job 0 finished: save at package.scala:42, took 215.267650 s
// 2.0G temp/riff-test-large/ (deflate)
// == Query test ==
spark.read.parquet("./temp/parquet-test-large").filter("col1 = 5290").show()
/*
17/05/11 09:32:32 INFO DAGScheduler: Job 4 finished: show at <console>:24, took 9.439326 s
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|col1|col2|col3|col4| col5| col6| col7| col8|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
*/
spark.read.riff("./temp/riff-test-large").filter("col1 = 5290").show()
/*
17/05/11 09:33:04 INFO DAGScheduler: Job 8 finished: show at <console>:27, took 1.838862 s
17/05/11 09:33:04 INFO CodeGenerator: Code generated in 13.423845 ms
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|col1|col3| col5|col2|col4| col6| col7| col8|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
*/
spark.read.parquet("./temp/parquet-test-large").filter("col5 = 'abc5290 abc5290 abc5290'").show()
/*
17/05/11 09:33:37 INFO DAGScheduler: Job 13 finished: show at <console>:27, took 9.495512 s
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|col1|col2|col3|col4| col5| col6| col7| col8|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+
*/
spark.read.riff("./temp/riff-test-large").filter("col5 = 'abc5290 abc5290 abc5290'").show()
/*
17/05/11 09:34:26 INFO DAGScheduler: Job 17 finished: show at <console>:27, took 1.865259 s
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|col1|col3| col5|col2|col4| col6| col7| col8|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+
*/
// == Range scan ==
spark.read.parquet("./temp/parquet-test-large").filter("col3 between 100 and 100000").foreach(_ => Unit)
// 17/05/11 09:35:12 INFO DAGScheduler: Job 19 finished: foreach at <console>:27, took 14.934467 s
spark.read.riff("./temp/riff-test-large").filter("col3 between 100 and 100000").foreach(_ => Unit)
// 17/05/11 09:35:34 INFO DAGScheduler: Job 20 finished: foreach at <console>:27, took 3.872620 s
spark.read.parquet("./temp/parquet-test-large").filter("col5 > 'abc9999'").foreach(_ => Unit)
// 17/05/11 09:36:12 INFO DAGScheduler: Job 22 finished: foreach at <console>:27, took 14.949234 s
spark.read.riff("./temp/riff-test-large").filter("col5 > 'abc9999'").foreach(_ => Unit)
// 17/05/11 09:36:28 INFO DAGScheduler: Job 23 finished: foreach at <console>:27, took 3.809892 s
// == Range projected scan ==
spark.read.parquet("./temp/parquet-test-large").filter("col5 > 'abc9999'").select("col1", "col3").foreach(_ => Unit)
// 17/05/11 09:37:14 INFO DAGScheduler: Job 25 finished: foreach at <console>:27, took 4.246997 s
spark.read.riff("./temp/riff-test-large").filter("col5 > 'abc9999'").select("col1", "col3").foreach(_ => Unit)
// 17/05/11 09:37:35 INFO DAGScheduler: Job 26 finished: foreach at <console>:27, took 3.821324 s
// == Snappy compression ==
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import com.github.sadikovi.spark.riff._
def row(i: Int): Row = {
Row(i, i, i.toLong, i.toLong, s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i")
}
val schema = StructType(
StructField("col1", IntegerType) ::
StructField("col2", IntegerType) ::
StructField("col3", LongType) ::
StructField("col4", LongType) ::
StructField("col5", StringType) ::
StructField("col6", StringType) ::
StructField("col7", StringType) ::
StructField("col8", StringType) :: Nil)
val df = spark.createDataFrame(sc.parallelize(0 until 100000000, 800).map(row), schema)
// Parquet:
df.write.parquet("./temp/parquet-test")
// 17/05/13 22:00:42 INFO DAGScheduler: Job 0 finished: parquet at <console>:38, took 359.524599 s
// 4.6G temp/parquet-test/
// Riff:
spark.conf.set("spark.sql.riff.compression.codec", "snappy")
df.write.option("index", "col1,col4,col6").riff("./temp/riff-test")
// 17/05/13 22:13:43 INFO DAGScheduler: Job 0 finished: save at package.scala:42, took 253.434462 s
// 3.4G temp/riff-test/
val p = spark.read.parquet("./temp/parquet-test")
p.filter("col1 = 450239").select("col1", "col4", "col8").collect
// 17/05/13 22:16:26 INFO DAGScheduler: Job 1 finished: collect at <console>:33, took 3.714159 s
// res0: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239])
p.filter("col8 = 'abc450239 abc450239 abc450239'").select("col1", "col4", "col8").collect
// 17/05/13 22:18:36 INFO DAGScheduler: Job 3 finished: collect at <console>:33, took 11.078386 s
// res2: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239])
p.filter("col8 = 'abc450239 abc450239 abc450239'").collect
// 17/05/13 22:21:47 INFO DAGScheduler: Job 5 finished: collect at <console>:33, took 31.259570 s
val r = spark.read.riff("./temp/riff-test")
r.filter("col1 = 450239").select("col1", "col4", "col8").collect
// 17/05/13 22:17:18 INFO DAGScheduler: Job 2 finished: collect at <console>:33, took 9.581120 s
// res1: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239])
r.filter("col8 = 'abc450239 abc450239 abc450239'").select("col1", "col4", "col8").collect
// 17/05/13 22:19:11 INFO DAGScheduler: Job 4 finished: collect at <console>:33, took 7.851032 s
// res3: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239])
r.filter("col8 = 'abc450239 abc450239 abc450239'").collect
// 17/05/13 22:22:22 INFO DAGScheduler: Job 6 finished: collect at <console>:33, took 7.648772 s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment