Skip to content

Instantly share code, notes, and snippets.

@Fokko
Created October 22, 2019 09:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Fokko/da5a1bc454d3e2a49077fc9d1ea46186 to your computer and use it in GitHub Desktop.
Save Fokko/da5a1bc454d3e2a49077fc9d1ea46186 to your computer and use it in GitHub Desktop.

Read Delta Lake as plain Apache Parquet

Let's create a very simple Delta Lake table using three rows (3 people), and write it to the local disk /tmp/delta/:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = spark.read.json("/tmp/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

scala> df.write.format("delta").save("/tmp/delta/")

Let's see what we got:

MacBook-Pro-van-Fokko:incubator-airflow fokkodriesprong$ ls /tmp/delta
_delta_log								part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet
MacBook-Pro-van-Fokko:incubator-airflow fokkodriesprong$ find /tmp/delta
/tmp/delta
/tmp/delta/.part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet.crc
/tmp/delta/_delta_log
/tmp/delta/_delta_log/00000000000000000000.json
/tmp/delta/part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet

We see one Apache Parquet file, which is what we expect since it only contains three rows. The _delta_log which is the log that Delta uses to keep track of all the parquet files that are in the directory. We now see one entry in the log.

Let's do some fancy ACID operation on the data. This can be and UPDATE, DELETE, INSERT or compaction operation. In this case we want to delete Michael from the table, sorry Michael:

val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("name = 'Michael'")

deltaTable.show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
+----+-------+

Lets look at the directory again:

MacBook-Pro-van-Fokko:incubator-airflow fokkodriesprong$ find /tmp/delta
/tmp/delta
/tmp/delta/.part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet.crc
/tmp/delta/_delta_log
/tmp/delta/_delta_log/00000000000000000001.json
/tmp/delta/_delta_log/00000000000000000000.json
/tmp/delta/.part-00000-b6e706db-01d0-4dad-9866-aea62f7c0824-c000.snappy.parquet.crc
/tmp/delta/part-00000-b6e706db-01d0-4dad-9866-aea62f7c0824-c000.snappy.parquet
/tmp/delta/part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet

We now see two versions of the data. Wat happens if we read this without Delta:

scala> spark.read.parquet("/tmp/delta/").show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|  30|   Andy|
|  19| Justin|
+----+-------+

We get both the versions, as the plain Parquet reader does consult the journal that of Delta, we just load all the versions, instead of the latests version. To fix this there are two options:

  • VACUUM VACUUM ([db_name.]table_name|path) RETAIN 0 HOURS the table, to get rid of the latest versions.
  • Only use the Delta Lake reader, which ships with Spark, but not all the big data technologies.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment