Skip to content

Instantly share code, notes, and snippets.

@rmoff
Created April 6, 2023 20:21
Show Gist options
  • Save rmoff/aad7c2c5ba3130a180cc12284aa9e72d to your computer and use it in GitHub Desktop.
Save rmoff/aad7c2c5ba3130a180cc12284aa9e72d to your computer and use it in GitHub Desktop.
Using lakeFS and Delta Lake with PySpark in a Jupyter Notebook
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "78797e22",
"metadata": {},
"source": [
"# lakeFS and Delta\n",
"\n",
"This uses the [Everything Bagel](https://github.com/treeverse/lakeFS/tree/master/deployments/compose) Docker Compose environment.\n",
"\n",
"[@rmoff](https://twitter.com/rmoff/) "
]
},
{
"cell_type": "markdown",
"id": "3a642edb",
"metadata": {},
"source": [
"## Setup"
]
},
{
"cell_type": "markdown",
"id": "3180968f",
"metadata": {},
"source": [
"Display version numbers just for info"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "45ba6754",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Kernel: /opt/conda/bin/python\n",
"Python version: 3.9.7 | packaged by conda-forge | (default, Oct 10 2021, 15:08:54) \n",
"[GCC 9.4.0]\n",
"PySpark version: 3.2.0\n"
]
}
],
"source": [
"import sys\n",
"print(\"Kernel:\", sys.executable)\n",
"print(\"Python version:\", sys.version)\n",
"\n",
"import pyspark\n",
"print(\"PySpark version:\", pyspark.__version__)\n"
]
},
{
"cell_type": "markdown",
"id": "c1e3db35",
"metadata": {},
"source": [
"### Spark\n",
"\n",
"_With the necessary Delta Lake config too_"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "6d3747db",
"metadata": {
"scrolled": false
},
"outputs": [],
"source": [
"from pyspark import SparkFiles\n",
"from pyspark.sql.session import SparkSession\n",
"\n",
"spark = (\n",
" SparkSession.builder.master(\"local[*]\")\n",
" .config(\"spark.jars.packages\", \"io.delta:delta-core_2.12:2.0.0\")\n",
" .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n",
" .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\n",
" .config(\"spark.delta.logStore.class\", \"org.apache.spark.sql.delta.storage.S3SingleDriverLogStore\")\n",
" .config(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\")\n",
" .config(\"spark.hadoop.fs.s3a.endpoint\", \"http://lakefs:8000\")\n",
" .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\")\n",
" .config(\"spark.hadoop.fs.s3a.access.key\", \"AKIAIOSFODNN7EXAMPLE\")\n",
" .config(\"spark.hadoop.fs.s3a.secret.key\", \"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY\") \n",
" .getOrCreate()\n",
")"
]
},
{
"cell_type": "markdown",
"id": "644a7daa",
"metadata": {},
"source": [
"#### Test delta - write/read local"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "e70664be",
"metadata": {},
"outputs": [],
"source": [
"data = spark.range(0, 5)\n",
"data.write.format(\"delta\").mode(\"overwrite\").save(\"/tmp/delta-table\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "b53a65c0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+\n",
"| id|\n",
"+---+\n",
"| 4|\n",
"| 3|\n",
"| 2|\n",
"| 0|\n",
"| 1|\n",
"+---+\n",
"\n"
]
}
],
"source": [
"df = spark.read.format(\"delta\").load(\"/tmp/delta-table\")\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "ffd24df2",
"metadata": {},
"source": [
"#### Test delta - write/read lakeFS"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "a1d64961",
"metadata": {},
"outputs": [],
"source": [
"data = spark.range(0, 5)\n",
"df.write.format(\"delta\").mode('overwrite').save('s3a://example/main/test')"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "b5b4b61e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+\n",
"| id|\n",
"+---+\n",
"| 0|\n",
"| 4|\n",
"| 3|\n",
"| 1|\n",
"| 2|\n",
"+---+\n",
"\n"
]
}
],
"source": [
"df = spark.read.format(\"delta\").load('s3a://example/main/test')\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "f824b929",
"metadata": {},
"source": [
"### LakeFS"
]
},
{
"cell_type": "markdown",
"id": "49c3136c",
"metadata": {},
"source": [
"#### Install libraries\n",
"\n",
"(could be built into the `Dockerfile`)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "f3c34c28",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: lakefs_client in /opt/conda/lib/python3.9/site-packages (0.98.0)\r\n",
"Requirement already satisfied: urllib3>=1.25.3 in /opt/conda/lib/python3.9/site-packages (from lakefs_client) (1.26.8)\r\n",
"Requirement already satisfied: python-dateutil in /opt/conda/lib/python3.9/site-packages (from lakefs_client) (2.8.2)\r\n",
"Requirement already satisfied: six>=1.5 in /opt/conda/lib/python3.9/site-packages (from python-dateutil->lakefs_client) (1.16.0)\r\n"
]
}
],
"source": [
"import sys\n",
"!{sys.executable} -m pip install lakefs_client"
]
},
{
"cell_type": "markdown",
"id": "ef108b55",
"metadata": {},
"source": [
"#### Config"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "b2e44584",
"metadata": {},
"outputs": [],
"source": [
"import lakefs_client\n",
"from lakefs_client import models\n",
"from lakefs_client.client import LakeFSClient\n",
"from lakefs_client.api import branches_api\n",
"from lakefs_client.api import commits_api\n",
"\n",
"# lakeFS credentials and endpoint\n",
"configuration = lakefs_client.Configuration()\n",
"configuration.username = 'AKIAIOSFODNN7EXAMPLE'\n",
"configuration.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'\n",
"configuration.host = 'http://lakefs:8000'\n",
"\n",
"client = LakeFSClient(configuration)\n",
"api_client = lakefs_client.ApiClient(configuration)"
]
},
{
"cell_type": "markdown",
"id": "0047139f",
"metadata": {},
"source": [
"#### List the current branches in the repository\n",
"\n",
"https://pydocs.lakefs.io/docs/BranchesApi.html#list_branches"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "9651e04e",
"metadata": {},
"outputs": [],
"source": [
"repo='example'"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "bdaa7756",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'main'"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"for b in client.branches.list_branches(repo).results:\n",
" display(b.id)"
]
},
{
"cell_type": "markdown",
"id": "0a45118a",
"metadata": {},
"source": [
"## Load some data into lakeFS"
]
},
{
"cell_type": "markdown",
"id": "ff22dfb5",
"metadata": {},
"source": [
"Read a parquet file from URL"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "8a87c7ef",
"metadata": {},
"outputs": [],
"source": [
"# The sample parquet file is Apache 2.0 licensed so perhaps include it in the Everything Bagel distribution? \n",
"url='https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata1.parquet?raw=true'\n",
"spark.sparkContext.addFile(url)\n",
"df = spark.read.parquet(\"file://\" + SparkFiles.get(\"userdata1.parquet\"))"
]
},
{
"cell_type": "markdown",
"id": "7c22768b",
"metadata": {},
"source": [
"How many rows of data?"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "6ec747fe",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1000"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"display(df.count())"
]
},
{
"cell_type": "markdown",
"id": "08f17847",
"metadata": {},
"source": [
"What does the data look like?"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "b6268496",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0--------------------------------\n",
" registration_dttm | 2016-02-03 07:55:29 \n",
" id | 1 \n",
" first_name | Amanda \n",
" last_name | Jordan \n",
" email | ajordan0@com.com \n",
" gender | Female \n",
" ip_address | 1.197.201.2 \n",
" cc | 6759521864920116 \n",
" country | Indonesia \n",
" birthdate | 3/8/1971 \n",
" salary | 49756.53 \n",
" title | Internal Auditor \n",
" comments | 1E+02 \n",
"only showing top 1 row\n",
"\n"
]
},
{
"data": {
"text/plain": [
"None"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"display(df.show(n=1,vertical=True))"
]
},
{
"cell_type": "markdown",
"id": "b41dab2b",
"metadata": {},
"source": [
"## Write data to lakeFS (on the `main` branch) in Delta format"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "40be34f5",
"metadata": {},
"outputs": [],
"source": [
"branch='main'"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "68718621",
"metadata": {},
"outputs": [],
"source": [
"df.write.format(\"delta\").mode('overwrite').save('s3a://'+repo+'/'+branch+'/demo/users')"
]
},
{
"cell_type": "markdown",
"id": "6963a378",
"metadata": {},
"source": [
"#### 👉🏻[The data as seen from LakeFS](http://localhost:8000/repositories/example/objects?ref=main&path=demo%2Fusers%2F)"
]
},
{
"cell_type": "markdown",
"id": "1bcdcb3c",
"metadata": {},
"source": [
"### Commit the new file in `main`\n",
"\n",
"https://pydocs.lakefs.io/docs/CommitsApi.html#commit"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "0078cc8a",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'committer': 'docker',\n",
" 'creation_date': 1680812443,\n",
" 'id': '642d398f9279554c2f2c643492959cf3cd92f2db05a8c666265c45d19eccd130',\n",
" 'message': 'Initial user data load',\n",
" 'meta_range_id': '',\n",
" 'metadata': {},\n",
" 'parents': ['7328d2f124d036865a3101e76b9fff9b77425c1ca5cf872329442fe76f97ad5c']}"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from lakefs_client.api import commits_api\n",
"from lakefs_client.model.commit import Commit\n",
"from lakefs_client.model.commit_creation import CommitCreation\n",
"\n",
"api_instance = commits_api.CommitsApi(api_client)\n",
"commit_creation = CommitCreation(\n",
" message=\"Initial user data load\"\n",
") \n",
"\n",
"api_instance.commit(repo, branch, commit_creation)"
]
},
{
"cell_type": "markdown",
"id": "dd3c5e94",
"metadata": {},
"source": [
"## Create a branch"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "0fa39ade",
"metadata": {},
"outputs": [],
"source": [
"branch='modify_user_data'"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "fecd1769",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'642d398f9279554c2f2c643492959cf3cd92f2db05a8c666265c45d19eccd130'"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"from lakefs_client.model.branch_creation import BranchCreation\n",
"\n",
"api_instance = branches_api.BranchesApi(api_client)\n",
"branch_creation = BranchCreation(\n",
" name=branch,\n",
" source=\"main\",\n",
") \n",
"\n",
"api_response = api_instance.create_branch(repo, branch_creation)\n",
"display(api_response)"
]
},
{
"cell_type": "markdown",
"id": "69e1a416",
"metadata": {},
"source": [
"### List the current branches in the `example` repository"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "118732d6",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'main'"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"'modify_user_data'"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"for b in client.branches.list_branches(repo).results:\n",
" display(b.id)"
]
},
{
"cell_type": "markdown",
"id": "1378435c",
"metadata": {},
"source": [
"## Add some new data with merge"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "ad246bf7",
"metadata": {},
"outputs": [],
"source": [
"from delta.tables import *\n",
"from pyspark.sql.functions import *"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "fc637861",
"metadata": {},
"outputs": [],
"source": [
"# The sample parquet file is Apache 2.0 licensed so perhaps include it in the Everything Bagel distribution? \n",
"url='https://github.com/Teradata/kylo/blob/master/samples/sample-data/parquet/userdata2.parquet?raw=true'\n",
"spark.sparkContext.addFile(url)\n",
"new_df = spark.read.parquet(\"file://\" + SparkFiles.get(\"userdata2.parquet\"))"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "d2f818de",
"metadata": {},
"outputs": [],
"source": [
"users_deltaTable = DeltaTable.forPath(spark, 's3a://'+repo+'/'+branch+'/demo/users')"
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "b57e93b9",
"metadata": {},
"outputs": [],
"source": [
"users_deltaTable.alias(\"users\").merge(\n",
" source = new_df.alias(\"new_users\"),\n",
" condition = \"users.id = new_users.id\") \\\n",
" .whenNotMatchedInsertAll() \\\n",
" .execute()"
]
},
{
"cell_type": "markdown",
"id": "bb92fada",
"metadata": {},
"source": [
"Commit in lakeFS"
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "86ae6261",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'committer': 'docker',\n",
" 'creation_date': 1680812447,\n",
" 'id': '2356793bc4723c62b829ea76d241a5d9728094812df22700923c4d495e22944a',\n",
" 'message': 'Merge in new user data',\n",
" 'meta_range_id': '',\n",
" 'metadata': {},\n",
" 'parents': ['642d398f9279554c2f2c643492959cf3cd92f2db05a8c666265c45d19eccd130']}"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"api_instance = commits_api.CommitsApi(api_client)\n",
"commit_creation = CommitCreation(\n",
" message=\"Merge in new user data\"\n",
") \n",
"\n",
"api_instance.commit(repo, branch, commit_creation)"
]
},
{
"cell_type": "markdown",
"id": "1b64c675",
"metadata": {},
"source": [
"## Update some data"
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "28d2041a",
"metadata": {},
"outputs": [],
"source": [
"deltaTable = DeltaTable.forPath(spark, 's3a://'+repo+'/'+branch+'/demo/users')"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "28b1cce1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+---------------+\n",
"| country| ip_address|\n",
"+--------+---------------+\n",
"| China| 140.35.109.83|\n",
"|Portugal| 232.234.81.197|\n",
"| China| 246.225.12.189|\n",
"| China|172.215.104.127|\n",
"| China| 191.88.236.116|\n",
"+--------+---------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"deltaTable.toDF().filter(col(\"country\").isin(\"Portugal\", \"China\")).select(\"country\",\"ip_address\").show(5)"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "6a2dd8c2",
"metadata": {},
"outputs": [],
"source": [
"deltaTable.update(\n",
" condition = \"country == 'Portugal'\",\n",
" set = { \"ip_address\" : \"'x.x.x.x'\" })"
]
},
{
"cell_type": "code",
"execution_count": 28,
"id": "ab657830",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------+---------------+\n",
"| country| ip_address|\n",
"+--------+---------------+\n",
"| China| 140.35.109.83|\n",
"|Portugal| x.x.x.x|\n",
"| China| 246.225.12.189|\n",
"| China|172.215.104.127|\n",
"| China| 191.88.236.116|\n",
"| China| 65.111.200.146|\n",
"| China| 252.20.193.145|\n",
"|Portugal| x.x.x.x|\n",
"| China| 152.6.235.33|\n",
"| China| 80.111.141.47|\n",
"+--------+---------------+\n",
"only showing top 10 rows\n",
"\n"
]
}
],
"source": [
"deltaTable.toDF().filter(col(\"country\").isin(\"Portugal\", \"China\")).select(\"country\",\"ip_address\").show(10)"
]
},
{
"cell_type": "markdown",
"id": "00cc8abc",
"metadata": {},
"source": [
"Commit in lakeFS"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "dd7a6843",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'committer': 'docker',\n",
" 'creation_date': 1680812449,\n",
" 'id': 'ff744a27ac9d092a8dc8b209b21699922a7c32829639987a77ce668837f6f69d',\n",
" 'message': 'Mask all IPs for users in Portugal',\n",
" 'meta_range_id': '',\n",
" 'metadata': {},\n",
" 'parents': ['2356793bc4723c62b829ea76d241a5d9728094812df22700923c4d495e22944a']}"
]
},
"execution_count": 29,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"api_instance = commits_api.CommitsApi(api_client)\n",
"commit_creation = CommitCreation(\n",
" message=\"Mask all IPs for users in Portugal\"\n",
") \n",
"\n",
"api_instance.commit(repo, branch, commit_creation)"
]
},
{
"cell_type": "markdown",
"id": "97239613",
"metadata": {},
"source": [
"## Delete some data"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "60544aeb",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"765"
]
},
"execution_count": 30,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"deltaTable.toDF().filter(col(\"salary\") > 60000).count()"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "ab9a8a5c",
"metadata": {},
"outputs": [],
"source": [
"deltaTable.delete(col(\"salary\") > 60000)"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "c094678f",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 32,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"deltaTable.toDF().filter(col(\"salary\") > 60000).count()"
]
},
{
"cell_type": "markdown",
"id": "295fedaa",
"metadata": {},
"source": [
"Commit in lakeFS"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "210fd6b0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'committer': 'docker',\n",
" 'creation_date': 1680812451,\n",
" 'id': 'a0ddd0a1b3091534e4e7b0384e72c65cad818c32810b52561c2fb69d2ef0b813',\n",
" 'message': 'Delete users with salary over 60k',\n",
" 'meta_range_id': '',\n",
" 'metadata': {},\n",
" 'parents': ['ff744a27ac9d092a8dc8b209b21699922a7c32829639987a77ce668837f6f69d']}"
]
},
"execution_count": 33,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"api_instance = commits_api.CommitsApi(api_client)\n",
"commit_creation = CommitCreation(\n",
" message=\"Delete users with salary over 60k\"\n",
") \n",
"\n",
"api_instance.commit(repo, branch, commit_creation)"
]
},
{
"cell_type": "markdown",
"id": "f24fcd35",
"metadata": {},
"source": [
"### Look at the data and diffs in LakeFS"
]
},
{
"cell_type": "markdown",
"id": "e127c37e",
"metadata": {},
"source": [
"#### 👉🏻 [`main`](http://localhost:8000/repositories/example/objects?ref=main&path=demo%2Fusers%2F)"
]
},
{
"cell_type": "markdown",
"id": "0bb2e574",
"metadata": {},
"source": [
"#### 👉🏻 [`modify_user_data`](http://localhost:8000/repositories/example/objects?ref=modify_user_data&path=demo%2Fusers%2F)"
]
},
{
"cell_type": "code",
"execution_count": 34,
"id": "6fc556d7",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"1000"
]
},
"execution_count": 34,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"DeltaTable.forPath(spark, 's3a://example/main/demo/users').toDF().count()"
]
},
{
"cell_type": "code",
"execution_count": 35,
"id": "7b05d9a8",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"236"
]
},
"execution_count": 35,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"DeltaTable.forPath(spark, 's3a://example/modify_user_data/demo/users').toDF().count()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.7"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment