Last active
May 13, 2017 10:25
-
-
Save sadikovi/a667406540aeac982faf890dbad3e832 to your computer and use it in GitHub Desktop.
Riff vs Parquet initial numbers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.types._ | |
import com.github.sadikovi.spark.riff._ | |
def row(i: Int): Row = { | |
Row(i, i, i.toLong, i.toLong, s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i") | |
} | |
val schema = StructType( | |
StructField("col1", IntegerType) :: | |
StructField("col2", IntegerType) :: | |
StructField("col3", LongType) :: | |
StructField("col4", LongType) :: | |
StructField("col5", StringType) :: | |
StructField("col6", StringType) :: | |
StructField("col7", StringType) :: | |
StructField("col8", StringType) :: Nil) | |
val df = spark.createDataFrame(sc.parallelize(0 until 100000000, 1000).map(row), schema) | |
spark.conf.set("spark.sql.parquet.compression.codec", "gzip") | |
df.write.parquet("./temp/parquet-test") | |
// 17/05/11 08:57:00 INFO DAGScheduler: Job 0 finished: parquet at <console>:38, took 290.750990 s | |
// 1.9G temp/parquet-test/ | |
spark.conf.set("spark.sql.riff.compression.codec", "gzip") | |
spark.conf.set("spark.sql.riff.column.filter.enabled", "true") | |
df.write.option("index", "col1,col3,col5").riff("./temp/riff-test") | |
// 17/05/11 09:01:35 INFO DAGScheduler: Job 0 finished: save at package.scala:42, took 198.533308 s | |
// 1.8G temp/riff-test/ | |
// == Query test == | |
spark.read.parquet("./temp/parquet-test").filter("col1 = 5290").show() | |
/* | |
17/05/11 09:03:16 INFO DAGScheduler: Job 9 finished: show at <console>:24, took 0.311518 s | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|col1|col2|col3|col4| col5| col6| col7| col8| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
*/ | |
spark.read.riff("./temp/riff-test").filter("col1 = 5290").show() | |
/* | |
17/05/11 09:04:56 INFO DAGScheduler: Job 17 finished: show at <console>:27, took 1.778238 s | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|col1|col3| col5|col2|col4| col6| col7| col8| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
*/ | |
spark.read.parquet("./temp/parquet-test").filter("col5 = 'abc5290 abc5290 abc5290'").show() | |
/* | |
17/05/11 09:05:28 INFO DAGScheduler: Job 22 finished: show at <console>:27, took 5.184153 s | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|col1|col2|col3|col4| col5| col6| col7| col8| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
*/ | |
spark.read.riff("./temp/riff-test").filter("col5 = 'abc5290 abc5290 abc5290'").show() | |
/* | |
17/05/11 09:05:59 INFO DAGScheduler: Job 26 finished: show at <console>:27, took 1.806723 s | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|col1|col3| col5|col2|col4| col6| col7| col8| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
*/ | |
// == Range scan == | |
spark.read.parquet("./temp/parquet-test").filter("col3 between 100 and 100000").foreach(_ => Unit) | |
// 17/05/11 09:06:31 INFO DAGScheduler: Job 28 finished: foreach at <console>:27, took 0.649757 s | |
spark.read.riff("./temp/riff-test").filter("col3 between 100 and 100000").foreach(_ => Unit) | |
// 17/05/11 09:06:53 INFO DAGScheduler: Job 29 finished: foreach at <console>:27, took 3.759603 s | |
spark.read.parquet("./temp/parquet-test").filter("col5 > 'abc9999'").foreach(_ => Unit) | |
// 17/05/11 09:07:24 INFO DAGScheduler: Job 31 finished: foreach at <console>:27, took 11.980590 s | |
spark.read.riff("./temp/riff-test").filter("col5 > 'abc9999'").foreach(_ => Unit) | |
// 17/05/11 09:07:55 INFO DAGScheduler: Job 32 finished: foreach at <console>:27, took 3.698204 s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.types._ | |
import com.github.sadikovi.spark.riff._ | |
def row(i: Int): Row = { | |
Row(i, i, i.toLong, i.toLong, s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i") | |
} | |
val schema = StructType( | |
StructField("col1", IntegerType) :: | |
StructField("col2", IntegerType) :: | |
StructField("col3", LongType) :: | |
StructField("col4", LongType) :: | |
StructField("col5", StringType) :: | |
StructField("col6", StringType) :: | |
StructField("col7", StringType) :: | |
StructField("col8", StringType) :: Nil) | |
val df = spark.createDataFrame(sc.parallelize(0 until 100000000, 1000).map(row), schema).repartition(1000) | |
// now we scan records that are repartitioned, most of the statistics are shuffled | |
df.write.parquet("./temp/parquet-test-large") | |
// 17/05/11 09:26:44 INFO DAGScheduler: Job 0 finished: parquet at <console>:38, took 185.521304 s | |
// 4.6G temp/parquet-test-large/ (snappy) | |
// now we scan records that are repartitioned, most of the statistics are shuffled | |
spark.conf.set("spark.sql.riff.column.filter.enabled", "true") | |
df.write.option("index", "col1,col3,col5").riff("./temp/riff-test-large") | |
// 17/05/11 09:31:23 INFO DAGScheduler: Job 0 finished: save at package.scala:42, took 215.267650 s | |
// 2.0G temp/riff-test-large/ (deflate) | |
// == Query test == | |
spark.read.parquet("./temp/parquet-test-large").filter("col1 = 5290").show() | |
/* | |
17/05/11 09:32:32 INFO DAGScheduler: Job 4 finished: show at <console>:24, took 9.439326 s | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|col1|col2|col3|col4| col5| col6| col7| col8| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
*/ | |
spark.read.riff("./temp/riff-test-large").filter("col1 = 5290").show() | |
/* | |
17/05/11 09:33:04 INFO DAGScheduler: Job 8 finished: show at <console>:27, took 1.838862 s | |
17/05/11 09:33:04 INFO CodeGenerator: Code generated in 13.423845 ms | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|col1|col3| col5|col2|col4| col6| col7| col8| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
*/ | |
spark.read.parquet("./temp/parquet-test-large").filter("col5 = 'abc5290 abc5290 abc5290'").show() | |
/* | |
17/05/11 09:33:37 INFO DAGScheduler: Job 13 finished: show at <console>:27, took 9.495512 s | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|col1|col2|col3|col4| col5| col6| col7| col8| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
|5290|5290|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+----+----+--------------------+--------------------+--------------------+--------------------+ | |
*/ | |
spark.read.riff("./temp/riff-test-large").filter("col5 = 'abc5290 abc5290 abc5290'").show() | |
/* | |
17/05/11 09:34:26 INFO DAGScheduler: Job 17 finished: show at <console>:27, took 1.865259 s | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|col1|col3| col5|col2|col4| col6| col7| col8| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
|5290|5290|abc5290 abc5290 a...|5290|5290|abc5290 abc5290 a...|abc5290 abc5290 a...|abc5290 abc5290 a...| | |
+----+----+--------------------+----+----+--------------------+--------------------+--------------------+ | |
*/ | |
// == Range scan == | |
spark.read.parquet("./temp/parquet-test-large").filter("col3 between 100 and 100000").foreach(_ => Unit) | |
// 17/05/11 09:35:12 INFO DAGScheduler: Job 19 finished: foreach at <console>:27, took 14.934467 s | |
spark.read.riff("./temp/riff-test-large").filter("col3 between 100 and 100000").foreach(_ => Unit) | |
// 17/05/11 09:35:34 INFO DAGScheduler: Job 20 finished: foreach at <console>:27, took 3.872620 s | |
spark.read.parquet("./temp/parquet-test-large").filter("col5 > 'abc9999'").foreach(_ => Unit) | |
// 17/05/11 09:36:12 INFO DAGScheduler: Job 22 finished: foreach at <console>:27, took 14.949234 s | |
spark.read.riff("./temp/riff-test-large").filter("col5 > 'abc9999'").foreach(_ => Unit) | |
// 17/05/11 09:36:28 INFO DAGScheduler: Job 23 finished: foreach at <console>:27, took 3.809892 s | |
// == Range projected scan == | |
spark.read.parquet("./temp/parquet-test-large").filter("col5 > 'abc9999'").select("col1", "col3").foreach(_ => Unit) | |
// 17/05/11 09:37:14 INFO DAGScheduler: Job 25 finished: foreach at <console>:27, took 4.246997 s | |
spark.read.riff("./temp/riff-test-large").filter("col5 > 'abc9999'").select("col1", "col3").foreach(_ => Unit) | |
// 17/05/11 09:37:35 INFO DAGScheduler: Job 26 finished: foreach at <console>:27, took 3.821324 s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// == Snappy compression == | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.types._ | |
import com.github.sadikovi.spark.riff._ | |
def row(i: Int): Row = { | |
Row(i, i, i.toLong, i.toLong, s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i", s"abc$i abc$i abc$i") | |
} | |
val schema = StructType( | |
StructField("col1", IntegerType) :: | |
StructField("col2", IntegerType) :: | |
StructField("col3", LongType) :: | |
StructField("col4", LongType) :: | |
StructField("col5", StringType) :: | |
StructField("col6", StringType) :: | |
StructField("col7", StringType) :: | |
StructField("col8", StringType) :: Nil) | |
val df = spark.createDataFrame(sc.parallelize(0 until 100000000, 800).map(row), schema) | |
// Parquet: | |
df.write.parquet("./temp/parquet-test") | |
// 17/05/13 22:00:42 INFO DAGScheduler: Job 0 finished: parquet at <console>:38, took 359.524599 s | |
// 4.6G temp/parquet-test/ | |
// Riff: | |
spark.conf.set("spark.sql.riff.compression.codec", "snappy") | |
df.write.option("index", "col1,col4,col6").riff("./temp/riff-test") | |
// 17/05/13 22:13:43 INFO DAGScheduler: Job 0 finished: save at package.scala:42, took 253.434462 s | |
// 3.4G temp/riff-test/ | |
val p = spark.read.parquet("./temp/parquet-test") | |
p.filter("col1 = 450239").select("col1", "col4", "col8").collect | |
// 17/05/13 22:16:26 INFO DAGScheduler: Job 1 finished: collect at <console>:33, took 3.714159 s | |
// res0: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239]) | |
p.filter("col8 = 'abc450239 abc450239 abc450239'").select("col1", "col4", "col8").collect | |
// 17/05/13 22:18:36 INFO DAGScheduler: Job 3 finished: collect at <console>:33, took 11.078386 s | |
// res2: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239]) | |
p.filter("col8 = 'abc450239 abc450239 abc450239'").collect | |
// 17/05/13 22:21:47 INFO DAGScheduler: Job 5 finished: collect at <console>:33, took 31.259570 s | |
val r = spark.read.riff("./temp/riff-test") | |
r.filter("col1 = 450239").select("col1", "col4", "col8").collect | |
// 17/05/13 22:17:18 INFO DAGScheduler: Job 2 finished: collect at <console>:33, took 9.581120 s | |
// res1: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239]) | |
r.filter("col8 = 'abc450239 abc450239 abc450239'").select("col1", "col4", "col8").collect | |
// 17/05/13 22:19:11 INFO DAGScheduler: Job 4 finished: collect at <console>:33, took 7.851032 s | |
// res3: Array[org.apache.spark.sql.Row] = Array([450239,450239,abc450239 abc450239 abc450239]) | |
r.filter("col8 = 'abc450239 abc450239 abc450239'").collect | |
// 17/05/13 22:22:22 INFO DAGScheduler: Job 6 finished: collect at <console>:33, took 7.648772 s |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment