Skip to content

Instantly share code, notes, and snippets.

@mniehoff
Last active February 24, 2025 21:03
How to use DuckDB in Databricks to process data stored in Databricks Unity Catalog. https://www.codecentric.de/wissens-hub/blog/access-databricks-unitycatalog-from-duckdb
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "fc6730cd-7169-4533-9eee-2cfa01ea186e",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"# Use DuckDB to Read data in a Databricks Unity Catalog\n",
"\n",
"Motivation: Databricks is a great platform when it comes to data management, governance and etc (mostly due to the unity catalog). But Spark as an engine is just ok'ish, especially when data is not realy big. New engines like polars or duckdb are better suited for this.\n",
"\n",
"Goal: Process data that is stored and managed in unity data catalog with duckdb.\n",
"\n",
"Ideas:\n",
"\n",
"1. the obvious: read the data with spark, convert the data to arrow, use duckdb\n",
"2. the direct: read the data directly with duckdb, using temporary table credentials provide by databricks\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "e9ba7ad5-ccd0-4103-bab1-6f95fa02001f",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"## Prep\n",
"Install dependencies:\n",
"- newest databricks-sdk that include the methods to retrieve temporary credential tokens\n",
"- duckdb\n",
"- azure sdk to read files directly"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "306add71-7a04-48a1-8a2d-2e9a1392e6fc",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"Requirement already satisfied: databricks-sdk in /local_disk0/.ephemeral_nfs/envs/pythonEnv-7e37d14b-f41b-44c7-9a4c-e6f8de21b634/lib/python3.12/site-packages (0.40.0)\nRequirement already satisfied: duckdb in /local_disk0/.ephemeral_nfs/envs/pythonEnv-7e37d14b-f41b-44c7-9a4c-e6f8de21b634/lib/python3.12/site-packages (1.1.3)\nRequirement already satisfied: requests<3,>=2.28.1 in /databricks/python3/lib/python3.12/site-packages (from databricks-sdk) (2.32.2)\nRequirement already satisfied: google-auth~=2.0 in /databricks/python3/lib/python3.12/site-packages (from databricks-sdk) (2.35.0)\nRequirement already satisfied: cachetools<6.0,>=2.0.0 in /databricks/python3/lib/python3.12/site-packages (from google-auth~=2.0->databricks-sdk) (5.3.3)\nRequirement already satisfied: pyasn1-modules>=0.2.1 in /databricks/python3/lib/python3.12/site-packages (from google-auth~=2.0->databricks-sdk) (0.2.8)\nRequirement already satisfied: rsa<5,>=3.1.4 in /databricks/python3/lib/python3.12/site-packages (from google-auth~=2.0->databricks-sdk) (4.9)\nRequirement already satisfied: charset-normalizer<4,>=2 in /databricks/python3/lib/python3.12/site-packages (from requests<3,>=2.28.1->databricks-sdk) (2.0.4)\nRequirement already satisfied: idna<4,>=2.5 in /databricks/python3/lib/python3.12/site-packages (from requests<3,>=2.28.1->databricks-sdk) (3.7)\nRequirement already satisfied: urllib3<3,>=1.21.1 in /databricks/python3/lib/python3.12/site-packages (from requests<3,>=2.28.1->databricks-sdk) (1.26.16)\nRequirement already satisfied: certifi>=2017.4.17 in /databricks/python3/lib/python3.12/site-packages (from requests<3,>=2.28.1->databricks-sdk) (2024.6.2)\nRequirement already satisfied: pyasn1<0.5.0,>=0.4.6 in /databricks/python3/lib/python3.12/site-packages (from pyasn1-modules>=0.2.1->google-auth~=2.0->databricks-sdk) (0.4.8)\n\u001B[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.\u001B[0m\n"
]
}
],
"source": [
"%pip install --upgrade databricks-sdk duckdb"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "0e7d1ec7-7cee-49b2-9f02-c20ec385825d",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"dbutils.library.restartPython()"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "a7675b70-3c87-445a-ab4f-04281f00553e",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"import pyarrow as pa\n",
"import duckdb\n",
"from pyspark.sql import DataFrame \n",
"from databricks.sdk import WorkspaceClient\n",
"from databricks.sdk.service.catalog import TableOperation\n",
"import pprint\n",
"import re"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "4da85fbf-7e19-4aff-9b0e-47b7162a4711",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"## Read data with Spark\n",
"Read the data with Spark, convert to Arrow. \n",
"\n",
"Works without Unity Catalog, but data must fit into the memory of the driver node"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "6cc4dd4e-2809-42c6-8d8a-100218fde8db",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"# read the data with spark\n",
"spark_df = spark.read.table(\"samples.nyctaxi.trips\")\n",
"# register as view for the the spark queries below\n",
"spark_df.createOrReplaceTempView(\"trips\")\n",
"\n",
"# create an arrow table from the spark dataframe that can be queried using duckdb\n",
"# the name of the variable here is the name of the table in duckdb\n",
"trips = spark_df.toArrow()"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "e52fa8d2-7e11-4446-9e8e-2c7f5218e6f9",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"toArrow() is a new method added in Spark 4. \n",
"Spark 4 is not yet released, but Databricks regularly adds new (unreleased) features from the open-source version to their runtime.\n",
"\n",
"The methods works like the toPandas() method, but instead of returning a pandas DataFrame, it returns an Arrow Table. Also the same limitations apply: like with toPandas() all data is loaded into memory of the driver node. Therefor this will not work for Datasets larger than memory. \n",
"That said: arrow is a bit more efficient than pandas in terms of memory usage, so it might work for a bit larger datasets than with pandas. It will definitly be faster in terms of performance for the conversation. \n",
"\n",
"Without toArrow(), you would have to do:\n",
"\n",
"pa.Table.from_batches(spark_df._collect_as_arrow())"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "cefea268-0976-473c-adef-966893055c41",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"### Count"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "165abbb4-0fc0-4c3e-9ba1-467f0d3749d9",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"┌──────────────┐\n",
"│ count_star() │\n",
"│ int64 │\n",
"├──────────────┤\n",
"│ 21932 │\n",
"└──────────────┘"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"duckdb.sql(\"SELECT COUNT (*) FROM trips\")"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "fe77ffc7-0a0b-4c16-b64c-69c4292231ae",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/html": [
"<style scoped>\n",
" .table-result-container {\n",
" max-height: 300px;\n",
" overflow: auto;\n",
" }\n",
" table, th, td {\n",
" border: 1px solid black;\n",
" border-collapse: collapse;\n",
" }\n",
" th, td {\n",
" padding: 5px;\n",
" }\n",
" th {\n",
" text-align: left;\n",
" }\n",
"</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>count(1)</th></tr></thead><tbody><tr><td>21932</td></tr></tbody></table></div>"
]
},
"metadata": {
"application/vnd.databricks.v1+output": {
"addedWidgets": {},
"aggData": [],
"aggError": "",
"aggOverflow": false,
"aggSchema": [],
"aggSeriesLimitReached": false,
"aggType": "",
"arguments": {},
"columnCustomDisplayInfos": {},
"data": [
[
21932
]
],
"datasetInfos": [],
"dbfsResultPath": null,
"isJsonSchema": true,
"metadata": {},
"overflow": false,
"plotOptions": {
"customPlotOptions": {},
"displayType": "table",
"pivotAggregation": null,
"pivotColumns": null,
"xColumns": null,
"yColumns": null
},
"removedWidgets": [],
"schema": [
{
"metadata": "{\"__autoGeneratedAlias\":\"true\"}",
"name": "count(1)",
"type": "\"long\""
}
],
"type": "table"
}
},
"output_type": "display_data"
}
],
"source": [
"display(\n",
" spark.sql(\"SELECT COUNT (*) FROM trips\")\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "5517df6a-3f9a-4c79-9d82-f2dedef8fc7c",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"### Some Groups/Aggregations\n",
"Average trip distance and fare amount by pickup zip code"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "e536f9f8-fd0a-4091-9216-ee79bab94b4e",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"sql = \"\"\"\n",
"SELECT pickup_zip, avg(trip_distance), avg(fare_amount) FROM trips GROUP BY pickup_zip ORDER BY pickup_zip LIMIT 10\n",
"\"\"\""
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "6a732b91-113a-40e6-8468-737e33b7e4f9",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"┌────────────┬────────────────────┬──────────────────┐\n",
"│ pickup_zip │ avg(trip_distance) │ avg(fare_amount) │\n",
"│ int32 │ double │ double │\n",
"├────────────┼────────────────────┼──────────────────┤\n",
"│ 7002 │ 1.595 │ 10.5 │\n",
"│ 7030 │ 6.3 │ 40.0 │\n",
"│ 7086 │ 0.0 │ 40.0 │\n",
"│ 7087 │ 8.67 │ 31.0 │\n",
"│ 7114 │ 0.0 │ 105.0 │\n",
"│ 7310 │ 0.0 │ 105.0 │\n",
"│ 7311 │ 2.0 │ 60.0 │\n",
"│ 7718 │ 0.8 │ 5.0 │\n",
"│ 7737 │ 2.92 │ 15.5 │\n",
"│ 7974 │ 0.0 │ 188.0 │\n",
"├────────────┴────────────────────┴──────────────────┤\n",
"│ 10 rows 3 columns │\n",
"└────────────────────────────────────────────────────┘"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"duckdb.sql(sql)"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "321a3fb3-ba68-49b9-9ba8-d8d764f67aef",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/html": [
"<style scoped>\n",
" .table-result-container {\n",
" max-height: 300px;\n",
" overflow: auto;\n",
" }\n",
" table, th, td {\n",
" border: 1px solid black;\n",
" border-collapse: collapse;\n",
" }\n",
" th, td {\n",
" padding: 5px;\n",
" }\n",
" th {\n",
" text-align: left;\n",
" }\n",
"</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>pickup_zip</th><th>avg(trip_distance)</th><th>avg(fare_amount)</th></tr></thead><tbody><tr><td>7002</td><td>1.595</td><td>10.5</td></tr><tr><td>7030</td><td>6.3</td><td>40.0</td></tr><tr><td>7086</td><td>0.0</td><td>40.0</td></tr><tr><td>7087</td><td>8.67</td><td>31.0</td></tr><tr><td>7114</td><td>0.0</td><td>105.0</td></tr><tr><td>7310</td><td>0.0</td><td>105.0</td></tr><tr><td>7311</td><td>2.0</td><td>60.0</td></tr><tr><td>7718</td><td>0.8</td><td>5.0</td></tr><tr><td>7737</td><td>2.92</td><td>15.5</td></tr><tr><td>7974</td><td>0.0</td><td>188.0</td></tr></tbody></table></div>"
]
},
"metadata": {
"application/vnd.databricks.v1+output": {
"addedWidgets": {},
"aggData": [],
"aggError": "",
"aggOverflow": false,
"aggSchema": [],
"aggSeriesLimitReached": false,
"aggType": "",
"arguments": {},
"columnCustomDisplayInfos": {},
"data": [
[
7002,
1.595,
10.5
],
[
7030,
6.3,
40.0
],
[
7086,
0.0,
40.0
],
[
7087,
8.67,
31.0
],
[
7114,
0.0,
105.0
],
[
7310,
0.0,
105.0
],
[
7311,
2.0,
60.0
],
[
7718,
0.8,
5.0
],
[
7737,
2.92,
15.5
],
[
7974,
0.0,
188.0
]
],
"datasetInfos": [],
"dbfsResultPath": null,
"isJsonSchema": true,
"metadata": {},
"overflow": false,
"plotOptions": {
"customPlotOptions": {},
"displayType": "table",
"pivotAggregation": null,
"pivotColumns": null,
"xColumns": null,
"yColumns": null
},
"removedWidgets": [],
"schema": [
{
"metadata": "{}",
"name": "pickup_zip",
"type": "\"integer\""
},
{
"metadata": "{\"__autoGeneratedAlias\":\"true\"}",
"name": "avg(trip_distance)",
"type": "\"double\""
},
{
"metadata": "{\"__autoGeneratedAlias\":\"true\"}",
"name": "avg(fare_amount)",
"type": "\"double\""
}
],
"type": "table"
}
},
"output_type": "display_data"
}
],
"source": [
"display(\n",
" spark.sql(sql)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "7a5e4c13-2afe-44ed-84f1-8ab7449e78f7",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"### From Arrow/Duckdb back to Spark DataFrame"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "1ad5d62f-840b-483e-a7e3-5c363e28a6af",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"In Spark 4 createDataFrame accepts an arrow table as argument. \n",
"The feature is available from Databricks Runtime 16.x and newer. \n",
"For older runtimes, duckdb can also convert to pandas (slower)"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "45f70110-a4a7-4fd8-aca7-144966a0142b",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/html": [
"<style scoped>\n",
" .table-result-container {\n",
" max-height: 300px;\n",
" overflow: auto;\n",
" }\n",
" table, th, td {\n",
" border: 1px solid black;\n",
" border-collapse: collapse;\n",
" }\n",
" th, td {\n",
" padding: 5px;\n",
" }\n",
" th {\n",
" text-align: left;\n",
" }\n",
"</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>pickup_zip</th><th>avg(trip_distance)</th><th>avg(fare_amount)</th></tr></thead><tbody><tr><td>7002</td><td>1.595</td><td>10.5</td></tr><tr><td>7030</td><td>6.3</td><td>40.0</td></tr><tr><td>7086</td><td>0.0</td><td>40.0</td></tr><tr><td>7087</td><td>8.67</td><td>31.0</td></tr><tr><td>7114</td><td>0.0</td><td>105.0</td></tr><tr><td>7310</td><td>0.0</td><td>105.0</td></tr><tr><td>7311</td><td>2.0</td><td>60.0</td></tr><tr><td>7718</td><td>0.8</td><td>5.0</td></tr><tr><td>7737</td><td>2.92</td><td>15.5</td></tr><tr><td>7974</td><td>0.0</td><td>188.0</td></tr></tbody></table></div>"
]
},
"metadata": {
"application/vnd.databricks.v1+output": {
"addedWidgets": {},
"aggData": [],
"aggError": "",
"aggOverflow": false,
"aggSchema": [],
"aggSeriesLimitReached": false,
"aggType": "",
"arguments": {},
"columnCustomDisplayInfos": {},
"data": [
[
7002,
1.595,
10.5
],
[
7030,
6.3,
40.0
],
[
7086,
0.0,
40.0
],
[
7087,
8.67,
31.0
],
[
7114,
0.0,
105.0
],
[
7310,
0.0,
105.0
],
[
7311,
2.0,
60.0
],
[
7718,
0.8,
5.0
],
[
7737,
2.92,
15.5
],
[
7974,
0.0,
188.0
]
],
"datasetInfos": [],
"dbfsResultPath": null,
"isJsonSchema": true,
"metadata": {},
"overflow": false,
"plotOptions": {
"customPlotOptions": {},
"displayType": "table",
"pivotAggregation": null,
"pivotColumns": null,
"xColumns": null,
"yColumns": null
},
"removedWidgets": [],
"schema": [
{
"metadata": "{}",
"name": "pickup_zip",
"type": "\"integer\""
},
{
"metadata": "{}",
"name": "avg(trip_distance)",
"type": "\"double\""
},
{
"metadata": "{}",
"name": "avg(fare_amount)",
"type": "\"double\""
}
],
"type": "table"
}
},
"output_type": "display_data"
}
],
"source": [
"# using pandas as intermediate\n",
"result = duckdb.sql(sql).df() # df creates a pandas dataframe\n",
"display(\n",
" spark.createDataFrame(result)\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "a01afee2-6660-4a44-a729-42c80adc906f",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "display_data",
"data": {
"text/html": [
"<style scoped>\n",
" .table-result-container {\n",
" max-height: 300px;\n",
" overflow: auto;\n",
" }\n",
" table, th, td {\n",
" border: 1px solid black;\n",
" border-collapse: collapse;\n",
" }\n",
" th, td {\n",
" padding: 5px;\n",
" }\n",
" th {\n",
" text-align: left;\n",
" }\n",
"</style><div class='table-result-container'><table class='table-result'><thead style='background-color: white'><tr><th>pickup_zip</th><th>avg(trip_distance)</th><th>avg(fare_amount)</th></tr></thead><tbody><tr><td>7002</td><td>1.595</td><td>10.5</td></tr><tr><td>7030</td><td>6.3</td><td>40.0</td></tr><tr><td>7086</td><td>0.0</td><td>40.0</td></tr><tr><td>7087</td><td>8.67</td><td>31.0</td></tr><tr><td>7114</td><td>0.0</td><td>105.0</td></tr><tr><td>7310</td><td>0.0</td><td>105.0</td></tr><tr><td>7311</td><td>2.0</td><td>60.0</td></tr><tr><td>7718</td><td>0.8</td><td>5.0</td></tr><tr><td>7737</td><td>2.92</td><td>15.5</td></tr><tr><td>7974</td><td>0.0</td><td>188.0</td></tr></tbody></table></div>"
]
},
"metadata": {
"application/vnd.databricks.v1+output": {
"addedWidgets": {},
"aggData": [],
"aggError": "",
"aggOverflow": false,
"aggSchema": [],
"aggSeriesLimitReached": false,
"aggType": "",
"arguments": {},
"columnCustomDisplayInfos": {},
"data": [
[
7002,
1.595,
10.5
],
[
7030,
6.3,
40.0
],
[
7086,
0.0,
40.0
],
[
7087,
8.67,
31.0
],
[
7114,
0.0,
105.0
],
[
7310,
0.0,
105.0
],
[
7311,
2.0,
60.0
],
[
7718,
0.8,
5.0
],
[
7737,
2.92,
15.5
],
[
7974,
0.0,
188.0
]
],
"datasetInfos": [],
"dbfsResultPath": null,
"isJsonSchema": true,
"metadata": {},
"overflow": false,
"plotOptions": {
"customPlotOptions": {},
"displayType": "table",
"pivotAggregation": null,
"pivotColumns": null,
"xColumns": null,
"yColumns": null
},
"removedWidgets": [],
"schema": [
{
"metadata": "{}",
"name": "pickup_zip",
"type": "\"integer\""
},
{
"metadata": "{}",
"name": "avg(trip_distance)",
"type": "\"double\""
},
{
"metadata": "{}",
"name": "avg(fare_amount)",
"type": "\"double\""
}
],
"type": "table"
}
},
"output_type": "display_data"
}
],
"source": [
"# using arrow as intermediate\n",
"# needs Databricks Runtime 16 or newer\n",
"result = duckdb.sql(sql).arrow()\n",
"display(\n",
" spark.createDataFrame(result)\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "d5d242a2-376c-471d-a435-f51ce6998e8b",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"# Read data directly from UnityCatalog"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "d55ce532-fd62-4462-b90b-f07c96b875d6",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"Instead of using spark to read from the unity catalog, we query the underlying data directly with duckdb.\n",
"\n",
"This also works outside of databricks"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "73cc8129-1035-4b39-a0a0-3266d64b0436",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"## Get the temporary credential"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "3a84d314-9f8b-4685-97e3-66676946018b",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"w = WorkspaceClient(\n",
" host=spark.conf.get(\n",
" \"spark.databricks.workspaceUrl\"\n",
" )\n",
")\n",
"\n",
"def get_temporary_credentials_for_table(table: str):\n",
" table_id = w.tables.get(table).table_id\n",
" return w.temporary_table_credentials.generate_temporary_table_credentials(table_id=table_id, operation=TableOperation.READ)"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "2237bca0-955e-4833-aaf8-bf523cb0a2d7",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [],
"source": [
"cred = get_temporary_credentials_for_table(\"samples.nyctaxi.trips\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "bbba3893-f691-4e83-a693-89cab83d4d56",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"## Store the credential in duckdb"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "a0794912-46c3-4da9-bfdb-65bbafb1ba40",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/plain": [
"┌─────────┐\n",
"│ Success │\n",
"│ boolean │\n",
"├─────────┤\n",
"│ true │\n",
"└─────────┘"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"storage_account_name = re.search('@(.*).dfs.', cred.url).group(1)\n",
"sql = f\"\"\"\n",
"SET azure_transport_option_type = 'curl';\n",
"CREATE OR REPLACE SECRET (\n",
" TYPE AZURE,\n",
" CONNECTION_STRING 'AccountName={storage_account_name};SharedAccessSignature={cred.azure_user_delegation_sas.sas_token}'\n",
");\n",
"\"\"\"\n",
"duckdb.sql(sql)"
]
},
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "e8c69363-087c-41dd-98d7-f2c0ca58cffd",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"source": [
"## Read from duckdb"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "e289ebd0-5718-4c5d-a413-a8bd27e7391f",
"showTitle": false,
"tableResultSettingsMap": {},
"title": ""
}
},
"outputs": [
{
"output_type": "stream",
"name": "stdout",
"output_type": "stream",
"text": [
"┌──────────────────────────┬──────────────────────────┬───────────────┬─────────────┬────────────┬─────────────┐\n│ tpep_pickup_datetime │ tpep_dropoff_datetime │ trip_distance │ fare_amount │ pickup_zip │ dropoff_zip │\n│ timestamp with time zone │ timestamp with time zone │ double │ double │ int32 │ int32 │\n├──────────────────────────┼──────────────────────────┼───────────────┼─────────────┼────────────┼─────────────┤\n│ 2016-02-13 21:47:53+00 │ 2016-02-13 21:57:15+00 │ 1.4 │ 8.0 │ 10103 │ 10110 │\n│ 2016-02-13 18:29:09+00 │ 2016-02-13 18:37:23+00 │ 1.31 │ 7.5 │ 10023 │ 10023 │\n│ 2016-02-06 19:40:58+00 │ 2016-02-06 19:52:32+00 │ 1.8 │ 9.5 │ 10001 │ 10018 │\n│ 2016-02-12 19:06:43+00 │ 2016-02-12 19:20:54+00 │ 2.3 │ 11.5 │ 10044 │ 10111 │\n│ 2016-02-23 10:27:56+00 │ 2016-02-23 10:58:33+00 │ 2.6 │ 18.5 │ 10199 │ 10022 │\n│ 2016-02-13 00:41:43+00 │ 2016-02-13 00:46:52+00 │ 1.4 │ 6.5 │ 10023 │ 10069 │\n│ 2016-02-18 23:49:53+00 │ 2016-02-19 00:12:53+00 │ 10.4 │ 31.0 │ 11371 │ 10003 │\n│ 2016-02-18 20:21:45+00 │ 2016-02-18 20:38:23+00 │ 10.15 │ 28.5 │ 11371 │ 11201 │\n│ 2016-02-03 10:47:50+00 │ 2016-02-03 11:07:06+00 │ 3.27 │ 15.0 │ 10014 │ 10023 │\n│ 2016-02-19 01:26:39+00 │ 2016-02-19 01:40:01+00 │ 4.42 │ 15.0 │ 10003 │ 11222 │\n├──────────────────────────┴──────────────────────────┴───────────────┴─────────────┴────────────┴─────────────┤\n│ 10 rows 6 columns │\n└──────────────────────────────────────────────────────────────────────────────────────────────────────────────┘\n\n"
]
}
],
"source": [
"sql = f\"\"\"\n",
"SELECT * FROM delta_scan(\"{cred.url}\") LIMIT 10;\n",
"\"\"\"\n",
"duckdb.sql(sql).show()"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"computePreferences": null,
"dashboards": [],
"environmentMetadata": {
"base_environment": "",
"client": "1"
},
"language": "python",
"notebookMetadata": {
"mostRecentlyExecutedCommandWithImplicitDF": {
"commandId": 5393086742018362,
"dataframes": [
"_sqldf"
]
},
"pythonIndentUnit": 4
},
"notebookName": "DuckDB in Databricks - For Blog",
"widgets": {}
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
# Databricks notebook source
# MAGIC %md
# MAGIC # Use DuckDB to Read data in a Databricks Unity Catalog
# MAGIC
# MAGIC Motivation: Databricks is a great platform when it comes to data management, governance and etc (mostly due to the unity catalog). But Spark as an engine is just ok'ish, especially when data is not realy big. New engines like polars or duckdb are better suited for this.
# MAGIC
# MAGIC Goal: Process data that is stored and managed in unity data catalog with duckdb.
# MAGIC
# MAGIC Ideas:
# MAGIC
# MAGIC 1. the obvious: read the data with spark, convert the data to arrow, use duckdb
# MAGIC 2. the direct: read the data directly with duckdb, using temporary table credentials provide by databricks
# MAGIC
# COMMAND ----------
# MAGIC %md
# MAGIC ## Prep
# MAGIC Install dependencies:
# MAGIC - newest databricks-sdk that include the methods to retrieve temporary credential tokens
# MAGIC - duckdb
# MAGIC - azure sdk to read files directly
# COMMAND ----------
# MAGIC %pip install --upgrade databricks-sdk duckdb
# COMMAND ----------
dbutils.library.restartPython()
# COMMAND ----------
import pyarrow as pa
import duckdb
from pyspark.sql import DataFrame
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import TableOperation
import pprint
import re
# COMMAND ----------
# MAGIC %md
# MAGIC ## Read data with Spark
# MAGIC Read the data with Spark, convert to Arrow.
# MAGIC
# MAGIC Works without Unity Catalog, but data must fit into the memory of the driver node
# COMMAND ----------
# read the data with spark
spark_df = spark.read.table("samples.nyctaxi.trips")
# register as view for the the spark queries below
spark_df.createOrReplaceTempView("trips")
# create an arrow table from the spark dataframe that can be queried using duckdb
# the name of the variable here is the name of the table in duckdb
trips = spark_df.toArrow()
# COMMAND ----------
# MAGIC %md
# MAGIC toArrow() is a new method added in Spark 4.
# MAGIC Spark 4 is not yet released, but Databricks regularly adds new (unreleased) features from the open-source version to their runtime.
# MAGIC
# MAGIC The methods works like the toPandas() method, but instead of returning a pandas DataFrame, it returns an Arrow Table. Also the same limitations apply: like with toPandas() all data is loaded into memory of the driver node. Therefor this will not work for Datasets larger than memory.
# MAGIC That said: arrow is a bit more efficient than pandas in terms of memory usage, so it might work for a bit larger datasets than with pandas. It will definitly be faster in terms of performance for the conversation.
# MAGIC
# MAGIC Without toArrow(), you would have to do:
# MAGIC
# MAGIC pa.Table.from_batches(spark_df._collect_as_arrow())
# COMMAND ----------
# MAGIC %md
# MAGIC ### Count
# COMMAND ----------
duckdb.sql("SELECT COUNT (*) FROM trips")
# COMMAND ----------
display(
spark.sql("SELECT COUNT (*) FROM trips")
)
# COMMAND ----------
# MAGIC %md
# MAGIC ### Some Groups/Aggregations
# MAGIC Average trip distance and fare amount by pickup zip code
# COMMAND ----------
sql = """
SELECT pickup_zip, avg(trip_distance), avg(fare_amount) FROM trips GROUP BY pickup_zip ORDER BY pickup_zip LIMIT 10
"""
# COMMAND ----------
duckdb.sql(sql)
# COMMAND ----------
display(
spark.sql(sql)
)
# COMMAND ----------
# MAGIC %md
# MAGIC ### From Arrow/Duckdb back to Spark DataFrame
# COMMAND ----------
# MAGIC %md
# MAGIC In Spark 4 createDataFrame accepts an arrow table as argument.
# MAGIC The feature is available from Databricks Runtime 16.x and newer.
# MAGIC For older runtimes, duckdb can also convert to pandas (slower)
# COMMAND ----------
# using pandas as intermediate
result = duckdb.sql(sql).df() # df creates a pandas dataframe
display(
spark.createDataFrame(result)
)
# COMMAND ----------
# using arrow as intermediate
# needs Databricks Runtime 16 or newer
result = duckdb.sql(sql).arrow()
display(
spark.createDataFrame(result)
)
# COMMAND ----------
# MAGIC %md
# MAGIC # Read data directly from UnityCatalog
# COMMAND ----------
# MAGIC %md
# MAGIC Instead of using spark to read from the unity catalog, we query the underlying data directly with duckdb.
# MAGIC
# MAGIC This also works outside of databricks
# COMMAND ----------
# MAGIC %md
# MAGIC ## Get the temporary credential
# COMMAND ----------
w = WorkspaceClient(
host=spark.conf.get(
"spark.databricks.workspaceUrl"
)
)
def get_temporary_credentials_for_table(table: str):
table_id = w.tables.get(table).table_id
return w.temporary_table_credentials.generate_temporary_table_credentials(table_id=table_id, operation=TableOperation.READ)
# COMMAND ----------
cred = get_temporary_credentials_for_table("samples.nyctaxi.trips")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Store the credential in duckdb
# COMMAND ----------
storage_account_name = re.search('@(.*).dfs.', cred.url).group(1)
sql = f"""
SET azure_transport_option_type = 'curl';
CREATE OR REPLACE SECRET (
TYPE AZURE,
CONNECTION_STRING 'AccountName={storage_account_name};SharedAccessSignature={cred.azure_user_delegation_sas.sas_token}'
);
"""
duckdb.sql(sql)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Read from duckdb
# COMMAND ----------
sql = f"""
SELECT * FROM delta_scan("{cred.url}") LIMIT 10;
"""
duckdb.sql(sql).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment