Let's create a very simple Delta Lake table using three rows (3 people), and write it to the local disk /tmp/delta/
:
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_172)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val df = spark.read.json("/tmp/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.write.format("delta").save("/tmp/delta/")
Let's see what we got:
MacBook-Pro-van-Fokko:incubator-airflow fokkodriesprong$ ls /tmp/delta
_delta_log part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet
MacBook-Pro-van-Fokko:incubator-airflow fokkodriesprong$ find /tmp/delta
/tmp/delta
/tmp/delta/.part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet.crc
/tmp/delta/_delta_log
/tmp/delta/_delta_log/00000000000000000000.json
/tmp/delta/part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet
We see one Apache Parquet file, which is what we expect since it only contains three rows. The _delta_log
which is the log that Delta uses to keep track of all the parquet files that are in the directory. We now see one entry in the log.
Let's do some fancy ACID operation on the data. This can be and UPDATE, DELETE, INSERT or compaction operation. In this case we want to delete Michael from the table, sorry Michael:
val deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete("name = 'Michael'")
deltaTable.show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
+----+-------+
Lets look at the directory again:
MacBook-Pro-van-Fokko:incubator-airflow fokkodriesprong$ find /tmp/delta
/tmp/delta
/tmp/delta/.part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet.crc
/tmp/delta/_delta_log
/tmp/delta/_delta_log/00000000000000000001.json
/tmp/delta/_delta_log/00000000000000000000.json
/tmp/delta/.part-00000-b6e706db-01d0-4dad-9866-aea62f7c0824-c000.snappy.parquet.crc
/tmp/delta/part-00000-b6e706db-01d0-4dad-9866-aea62f7c0824-c000.snappy.parquet
/tmp/delta/part-00000-dd10c4fd-20b9-4751-97c5-6c9f1e755e37-c000.snappy.parquet
We now see two versions of the data. Wat happens if we read this without Delta:
scala> spark.read.parquet("/tmp/delta/").show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
| 30| Andy|
| 19| Justin|
+----+-------+
We get both the versions, as the plain Parquet reader does consult the journal that of Delta, we just load all the versions, instead of the latests version. To fix this there are two options:
VACUUM VACUUM ([db_name.]table_name|path) RETAIN 0 HOURS
the table, to get rid of the latest versions.- Only use the Delta Lake reader, which ships with Spark, but not all the big data technologies.