Skip to content

Instantly share code, notes, and snippets.

@aseigneurin
Created November 15, 2016 15:25
Show Gist options
  • Save aseigneurin/59844ac82da93eb9c7623931d3412783 to your computer and use it in GitHub Desktop.
Save aseigneurin/59844ac82da93eb9c7623931d3412783 to your computer and use it in GitHub Desktop.
Spark - Parquet files

Spark - Parquet files

Basic file formats - such as CSV, JSON or other text formats - can be useful when exchanging data between applications. When it comes to storing intermediate data between steps of an application, Parquet can provide more advanced capabilities:

  • Support for complex types, as opposed to string-based types (CSV) or a limited type system (JSON only supports strings, basic numbers, booleans).
  • Columnar storage - more efficient when not all the columns are used or when filtering the data.
  • Partitioning - files are partitioned out of the box
  • Compression - pages can be compressed with Snappy or Gzip (this preserves the partitioning)

The tests here are performed with Spark 2.0.1 on a cluster with 3 workers (c4.4xlarge, 16 vCPU and 30 GB each).

Notice that the measured times can vary from one run to each other by +/- 5% and should therefore not be compared strictly.

Converting the text files to Parquet

Before comparing the performance between text files and Parquet files, we need to actually convert the text files to Parquet.

Here, we analyze the results when using the 3 compression methods:

  • none: no compression
  • snappy: provides a good balance between compression and speed. The documentation says: "It does not aim for maximum compression, or compatibility with any other compression library; instead, it aims for very high speeds and reasonable compression."
  • gzip: higher compression than Snappy but very CPU intensive.

Files are read from the local drive and saved to S3.

Here is the code:

val codecs = Array("none", "snappy", "gzip")
for (codec <- codecs) {
  df.write.format("parquet")
    .option("compression", codec)
    .save(s"${rootPath}.parquet-${codec}")
}

Conversion of the accounts file to Parquet

Compression File size Time
none 170 MB 156.6 s
snappy 72 MB 157.8 s
gzip 44 MB 152.2 s

Conversion of the transactions file to Parquet

Compression File size Time
none 914 MB 637.6 s
snappy 300 MB 614.8 s
gzip 178 MB 595.6 s

Baseline - Joining 2 DataFrames read from text files

The baseline for testing performance is a join between 2 files:

  • mock_accounts_10M.dat: a JSON file with 10 million accounts, 4.2 GB.
  • mock_transactions_40M_10M.dat: a CSV file with 40 million accounts, 14.4 GB.

The application leverages the DataFrames API of Spark. Here is the code:

val accountsDF = spark.read.json(accountsFile)

val transactionsDF = spark.sparkContext.textFile(transactionsFile)
  .map(Transaction(_))
  .toDF()

val joinDF = accountsDF.join(transactionsDF, accountsDF("systemOfRecordAcctId") === transactionsDF("accountId"))

Then, either the results are counted:

val count = joinDF.count()

Or they are written to disk as a CSV file:

joinDF.write.csv(resultsFile)

The application behaves as follows:

  • The accounts file is read as 134 partitions (coalesced to 48 partitions by the JSON reader).
  • The transactions file is read as 460 partitions.
  • The join is performed with 200 partitions (default partitioning).
  • The join yields 39998919 results.

Results are as follow:

# of partitions Time (join + count only) Time (join + write to CSV only)
200 70.53 s 129.7 s

Results - Joining 2 DataFrames read from Parquet files

In this test, we use the Parquet files compressed with Snappy because:

  • Snappy provides a good compression ratio while not requiring too much CPU resources
  • Snappy is the default compression method when writing Parquet files with Spark.

The code being used here is very similar - we only changed the way the files are read:

val accountsDF = spark.read.parquet(accountsFile)

val transactionsDF = spark.read.parquet(transactionsFile)

val joinDF = accountsDF.join(transactionsDF, accountsDF("systemOfRecordAcctId") === transactionsDF("accountId"))

We run this test multiple times by adjusting the spark.sql.shuffle.partitions configuration parameter to see the impact of the number of partitions in the join.

Here are the results:

# of partitions Time (join + count only) Time (join + write to CSV only)
10 24.99 s 92.64 s
25 19.12 s 74.41 s
50 20.97 s 55.54 s
100 19.10 s 50.67 s
150 20.29 s 48.97 s
200 19.75 s 48.87 s
250 17.11 s 50.30 s
300 17.57 s 48.27 s

Here is what we can see:

  • Counting the results is 3.5 times faster than using text files (19.75 s vs 70.53 s). This is because the files don't need to be parsed and also thanks to the columnar storage: only the columns used in the join are read.
  • Computing the results and writing them is 2.5 times faster (48.87 s vs 129.7 s). In this test, all the columns needs to be read.
  • The best performance is achieved when the join is performed with 100 partitions or more, that is, with at least 2 tasks per CPU.
@michTalebzadeh
Copy link

GZIP provides the best compression for Parquet files. Around 100% storage saving compared to 73% for SNAPPY. My test were based on Google BigQuery tables stored on Google Cloud storage and accesses as EXTERNAL tables in BigQuery.

https://www.linkedin.com/pulse/leveraging-google-cloud-storage-cost-saving-bigquery-mich/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment