Created
July 17, 2022 16:58
-
-
Save MrPowers/cd7bcb54ae20ade5d98a7304f44cfdb7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pathlib | |
import shutil | |
import deltalake as dl | |
import pandas as pd | |
import pyarrow.dataset as ds | |
from pyspark.sql import SparkSession | |
from delta import * | |
import chispa | |
builder = ( | |
SparkSession.builder.appName("dat") | |
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") | |
.config( | |
"spark.sql.catalog.spark_catalog", | |
"org.apache.spark.sql.delta.catalog.DeltaCatalog", | |
) | |
.config("spark.databricks.delta.schema.autoMerge.enabled", True) | |
) | |
spark = configure_spark_with_delta_pip(builder).getOrCreate() | |
def test_read_delta_rs_with_spark_delta(): | |
cwd = pathlib.Path().resolve() | |
dirpath = pathlib.Path("tmp") / "delta-table" | |
if dirpath.exists() and dirpath.is_dir(): | |
shutil.rmtree(dirpath) | |
# create Delta Lake with delta-rs | |
df = pd.DataFrame({"num": [1, 2, 3], "letter": ["a", "b", "c"]}) | |
dl.writer.write_deltalake(f"{cwd}/tmp/delta-table", df) | |
# read Delta Lake with Spark | |
df = spark.read.format("delta").load("./tmp/delta-table") | |
expected_data = [ | |
(1, "a"), | |
(2, "b"), | |
(3, "c"), | |
] | |
expected_df = spark.createDataFrame(expected_data, ["num", "letter"]) | |
chispa.assert_df_equality(df, expected_df) | |
# insert more data into Delta Lake with delta-rs | |
df = pd.DataFrame({"num": [77, 88, 99], "letter": ["x", "y", "z"]}) | |
dl.writer.write_deltalake(f"{cwd}/tmp/delta-table", df, mode="append") | |
# read Delta Lake with Spark | |
df = spark.read.format("delta").load("./tmp/delta-table") | |
expected_data = [ | |
(1, "a"), | |
(2, "b"), | |
(3, "c"), | |
(77, "x"), | |
(88, "y"), | |
(99, "z"), | |
] | |
expected_df = spark.createDataFrame(expected_data, ["num", "letter"]) | |
chispa.assert_df_equality(df, expected_df, ignore_row_order=True) | |
# Assert you can time travel with Spark | |
df0 = spark.read.format("delta").option("versionAsOf", "0").load("tmp/delta-table") | |
expected_data = [ | |
(1, "a"), | |
(2, "b"), | |
(3, "c"), | |
] | |
expected_df = spark.createDataFrame(expected_data, ["num", "letter"]) | |
chispa.assert_df_equality(df0, expected_df) | |
# Delete rows with delta-rs | |
dt = dl.DeltaTable("./tmp/delta-table") | |
dataset = dt.to_pyarrow_dataset() | |
condition = (ds.field("num") > 1.0) & (ds.field("num") < 99.0) | |
filtered = dataset.to_table(filter=condition).to_pandas() | |
dl.writer.write_deltalake(f"{cwd}/tmp/delta-table", filtered, mode="overwrite") | |
# read Delta Lake with Spark | |
df = spark.read.format("delta").load("./tmp/delta-table") | |
expected_data = [ | |
(2, "b"), | |
(3, "c"), | |
(77, "x"), | |
(88, "y"), | |
] | |
expected_df = spark.createDataFrame(expected_data, ["num", "letter"]) | |
chispa.assert_df_equality(df, expected_df, ignore_row_order=True) | |
# vacuum old files | |
dt = dl.DeltaTable("./tmp/delta-table") | |
dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) | |
# check you can still read Delta Lake with Spark | |
df = spark.read.format("delta").load("./tmp/delta-table") | |
expected_data = [ | |
(2, "b"), | |
(3, "c"), | |
(77, "x"), | |
(88, "y"), | |
] | |
expected_df = spark.createDataFrame(expected_data, ["num", "letter"]) | |
chispa.assert_df_equality(df, expected_df, ignore_row_order=True) | |
# cleanup files now that the test is done | |
dirpath = pathlib.Path("tmp") / "delta-table" | |
if dirpath.exists() and dirpath.is_dir(): | |
shutil.rmtree(dirpath) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment