Skip to content

Instantly share code, notes, and snippets.

@cbattlegear
Created December 19, 2022 16:52
Show Gist options
  • Save cbattlegear/b59bf091359e51478ee989ec3c14cd46 to your computer and use it in GitHub Desktop.
Save cbattlegear/b59bf091359e51478ee989ec3c14cd46 to your computer and use it in GitHub Desktop.
Showing a scala based UDF being called from PySpark in a Synapse Notebook
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"source": [
"# This runs our initial registration process to add the UDF to Spark\r\n",
"from pyspark import SparkConf, SparkContext\r\n",
"\r\n",
"# Note the namespaces here are dependent on what you created in IntelliJ\r\n",
"# The first item after _jvm is the \"package\" line from your scala code\r\n",
"# The second item after that is the Object name from your scala code\r\n",
"spark._jvm.supersimpleudf.NewUDF.registerUdf()"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cbattagler",
"session_id": "0",
"statement_id": 1,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-19T16:44:47.8415925Z",
"session_start_time": "2022-12-19T16:44:47.8704199Z",
"execution_start_time": "2022-12-19T16:48:15.3562432Z",
"execution_finish_time": "2022-12-19T16:48:15.9213929Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cbattagler, 0, 1, Finished, Available)"
},
"metadata": {}
},
{
"output_type": "execute_result",
"execution_count": 5,
"data": {
"text/plain": "JavaObject id=o1712"
},
"metadata": {}
}
],
"execution_count": 1,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# Once added we are able to directly use the UDF in Spark SQL\r\n",
"\r\n",
"from pyspark.sql import Row\r\n",
"\r\n",
"# This creates a testing DataFrame\r\n",
"olddf = spark.createDataFrame([\r\n",
" Row(a=1, b=4),\r\n",
" Row(a=2, b=4),\r\n",
" Row(a=3, b=4),\r\n",
" Row(a=4, b=4),\r\n",
" Row(a=5, b=4),\r\n",
" Row(a=6, b=4),\r\n",
" Row(a=7, b=4),\r\n",
"])\r\n",
"\r\n",
"# Adds it as a temp view into Spark SQL\r\n",
"olddf.createOrReplaceTempView(\"olddf\")\r\n",
"\r\n",
"# And runs our UDF \"addOne\" assigning the results to a new Data Frame\r\n",
"df = spark.sql(\"SELECT addOne(a) FROM olddf\")\r\n",
"df.show()"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cbattagler",
"session_id": "0",
"statement_id": 2,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-19T16:44:47.8430828Z",
"session_start_time": null,
"execution_start_time": "2022-12-19T16:48:16.0337868Z",
"execution_finish_time": "2022-12-19T16:49:01.9679248Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cbattagler, 0, 2, Finished, Available)"
},
"metadata": {}
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---------+\n|addOne(a)|\n+---------+\n| 2|\n| 3|\n| 4|\n| 5|\n| 6|\n| 7|\n| 8|\n+---------+\n\n"
]
}
],
"execution_count": 2,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Using this directly from PySpark becomes more complex\r\n",
"from pyspark.sql.column import Column, _to_java_column, _to_seq \r\n",
"\r\n",
"# Create a testing data frame again\r\n",
"newdf = spark.createDataFrame([\r\n",
" Row(a=1, b=4),\r\n",
" Row(a=2, b=4),\r\n",
" Row(a=3, b=4),\r\n",
" Row(a=4, b=4),\r\n",
" Row(a=5, b=4),\r\n",
" Row(a=6, b=4),\r\n",
" Row(a=7, b=4),\r\n",
"])\r\n",
"\r\n",
"# Create a function to directly interface with the UDF\r\n",
"def add_one_udf(a): \r\n",
" # Assign our UDF to a variable we can call (see above for dot naming conventions)\r\n",
" addOneUDF = spark._jvm.supersimpleudf.NewUDF.registerUdf()\r\n",
" # Return our data as a column from the UDF. The apply method runs the UDF.\r\n",
" # The _to_seq function converts our data to a JVM sequence (what the UDF needs)\r\n",
" # [a] is the column data we sent, and _to_java_column is the convertor used.\r\n",
" return Column(addOneUDF.apply(_to_seq(spark, [a], _to_java_column)))\r\n",
"\r\n",
"# Finally we run our UDF against our data frame and create a new output\r\n",
"df = newdf.select(add_one_udf(\"a\").alias(\"aplus\"))\r\n",
"\r\n",
"df.show()"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cbattagler",
"session_id": "0",
"statement_id": 3,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-19T16:44:47.8497009Z",
"session_start_time": null,
"execution_start_time": "2022-12-19T16:49:02.0824743Z",
"execution_finish_time": "2022-12-19T16:49:03.199664Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cbattagler, 0, 3, Finished, Available)"
},
"metadata": {}
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"+-----+\n|aplus|\n+-----+\n| 2|\n| 3|\n| 4|\n| 5|\n| 6|\n| 7|\n| 8|\n+-----+\n\n"
]
}
],
"execution_count": 3,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"language_info": {
"name": "python"
},
"kernelspec": {
"name": "synapse_pyspark",
"language": "Python",
"display_name": "Synapse PySpark"
},
"kernel_info": {
"name": "synapse_pyspark"
},
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment