Skip to content

Instantly share code, notes, and snippets.

@MrPowers
Created July 17, 2022 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MrPowers/cd7bcb54ae20ade5d98a7304f44cfdb7 to your computer and use it in GitHub Desktop.
Save MrPowers/cd7bcb54ae20ade5d98a7304f44cfdb7 to your computer and use it in GitHub Desktop.
import pathlib
import shutil
import deltalake as dl
import pandas as pd
import pyarrow.dataset as ds
from pyspark.sql import SparkSession
from delta import *
import chispa
builder = (
SparkSession.builder.appName("dat")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
)
.config("spark.databricks.delta.schema.autoMerge.enabled", True)
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()
def test_read_delta_rs_with_spark_delta():
cwd = pathlib.Path().resolve()
dirpath = pathlib.Path("tmp") / "delta-table"
if dirpath.exists() and dirpath.is_dir():
shutil.rmtree(dirpath)
# create Delta Lake with delta-rs
df = pd.DataFrame({"num": [1, 2, 3], "letter": ["a", "b", "c"]})
dl.writer.write_deltalake(f"{cwd}/tmp/delta-table", df)
# read Delta Lake with Spark
df = spark.read.format("delta").load("./tmp/delta-table")
expected_data = [
(1, "a"),
(2, "b"),
(3, "c"),
]
expected_df = spark.createDataFrame(expected_data, ["num", "letter"])
chispa.assert_df_equality(df, expected_df)
# insert more data into Delta Lake with delta-rs
df = pd.DataFrame({"num": [77, 88, 99], "letter": ["x", "y", "z"]})
dl.writer.write_deltalake(f"{cwd}/tmp/delta-table", df, mode="append")
# read Delta Lake with Spark
df = spark.read.format("delta").load("./tmp/delta-table")
expected_data = [
(1, "a"),
(2, "b"),
(3, "c"),
(77, "x"),
(88, "y"),
(99, "z"),
]
expected_df = spark.createDataFrame(expected_data, ["num", "letter"])
chispa.assert_df_equality(df, expected_df, ignore_row_order=True)
# Assert you can time travel with Spark
df0 = spark.read.format("delta").option("versionAsOf", "0").load("tmp/delta-table")
expected_data = [
(1, "a"),
(2, "b"),
(3, "c"),
]
expected_df = spark.createDataFrame(expected_data, ["num", "letter"])
chispa.assert_df_equality(df0, expected_df)
# Delete rows with delta-rs
dt = dl.DeltaTable("./tmp/delta-table")
dataset = dt.to_pyarrow_dataset()
condition = (ds.field("num") > 1.0) & (ds.field("num") < 99.0)
filtered = dataset.to_table(filter=condition).to_pandas()
dl.writer.write_deltalake(f"{cwd}/tmp/delta-table", filtered, mode="overwrite")
# read Delta Lake with Spark
df = spark.read.format("delta").load("./tmp/delta-table")
expected_data = [
(2, "b"),
(3, "c"),
(77, "x"),
(88, "y"),
]
expected_df = spark.createDataFrame(expected_data, ["num", "letter"])
chispa.assert_df_equality(df, expected_df, ignore_row_order=True)
# vacuum old files
dt = dl.DeltaTable("./tmp/delta-table")
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False)
# check you can still read Delta Lake with Spark
df = spark.read.format("delta").load("./tmp/delta-table")
expected_data = [
(2, "b"),
(3, "c"),
(77, "x"),
(88, "y"),
]
expected_df = spark.createDataFrame(expected_data, ["num", "letter"])
chispa.assert_df_equality(df, expected_df, ignore_row_order=True)
# cleanup files now that the test is done
dirpath = pathlib.Path("tmp") / "delta-table"
if dirpath.exists() and dirpath.is_dir():
shutil.rmtree(dirpath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment