Created
December 19, 2022 16:52
-
-
Save cbattlegear/b59bf091359e51478ee989ec3c14cd46 to your computer and use it in GitHub Desktop.
Showing a scala based UDF being called from PySpark in a Synapse Notebook
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# This runs our initial registration process to add the UDF to Spark\r\n", | |
"from pyspark import SparkConf, SparkContext\r\n", | |
"\r\n", | |
"# Note the namespaces here are dependent on what you created in IntelliJ\r\n", | |
"# The first item after _jvm is the \"package\" line from your scala code\r\n", | |
"# The second item after that is the Object name from your scala code\r\n", | |
"spark._jvm.supersimpleudf.NewUDF.registerUdf()" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cbattagler", | |
"session_id": "0", | |
"statement_id": 1, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-19T16:44:47.8415925Z", | |
"session_start_time": "2022-12-19T16:44:47.8704199Z", | |
"execution_start_time": "2022-12-19T16:48:15.3562432Z", | |
"execution_finish_time": "2022-12-19T16:48:15.9213929Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cbattagler, 0, 1, Finished, Available)" | |
}, | |
"metadata": {} | |
}, | |
{ | |
"output_type": "execute_result", | |
"execution_count": 5, | |
"data": { | |
"text/plain": "JavaObject id=o1712" | |
}, | |
"metadata": {} | |
} | |
], | |
"execution_count": 1, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# Once added we are able to directly use the UDF in Spark SQL\r\n", | |
"\r\n", | |
"from pyspark.sql import Row\r\n", | |
"\r\n", | |
"# This creates a testing DataFrame\r\n", | |
"olddf = spark.createDataFrame([\r\n", | |
" Row(a=1, b=4),\r\n", | |
" Row(a=2, b=4),\r\n", | |
" Row(a=3, b=4),\r\n", | |
" Row(a=4, b=4),\r\n", | |
" Row(a=5, b=4),\r\n", | |
" Row(a=6, b=4),\r\n", | |
" Row(a=7, b=4),\r\n", | |
"])\r\n", | |
"\r\n", | |
"# Adds it as a temp view into Spark SQL\r\n", | |
"olddf.createOrReplaceTempView(\"olddf\")\r\n", | |
"\r\n", | |
"# And runs our UDF \"addOne\" assigning the results to a new Data Frame\r\n", | |
"df = spark.sql(\"SELECT addOne(a) FROM olddf\")\r\n", | |
"df.show()" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cbattagler", | |
"session_id": "0", | |
"statement_id": 2, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-19T16:44:47.8430828Z", | |
"session_start_time": null, | |
"execution_start_time": "2022-12-19T16:48:16.0337868Z", | |
"execution_finish_time": "2022-12-19T16:49:01.9679248Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cbattagler, 0, 2, Finished, Available)" | |
}, | |
"metadata": {} | |
}, | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"+---------+\n|addOne(a)|\n+---------+\n| 2|\n| 3|\n| 4|\n| 5|\n| 6|\n| 7|\n| 8|\n+---------+\n\n" | |
] | |
} | |
], | |
"execution_count": 2, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# Using this directly from PySpark becomes more complex\r\n", | |
"from pyspark.sql.column import Column, _to_java_column, _to_seq \r\n", | |
"\r\n", | |
"# Create a testing data frame again\r\n", | |
"newdf = spark.createDataFrame([\r\n", | |
" Row(a=1, b=4),\r\n", | |
" Row(a=2, b=4),\r\n", | |
" Row(a=3, b=4),\r\n", | |
" Row(a=4, b=4),\r\n", | |
" Row(a=5, b=4),\r\n", | |
" Row(a=6, b=4),\r\n", | |
" Row(a=7, b=4),\r\n", | |
"])\r\n", | |
"\r\n", | |
"# Create a function to directly interface with the UDF\r\n", | |
"def add_one_udf(a): \r\n", | |
" # Assign our UDF to a variable we can call (see above for dot naming conventions)\r\n", | |
" addOneUDF = spark._jvm.supersimpleudf.NewUDF.registerUdf()\r\n", | |
" # Return our data as a column from the UDF. The apply method runs the UDF.\r\n", | |
" # The _to_seq function converts our data to a JVM sequence (what the UDF needs)\r\n", | |
" # [a] is the column data we sent, and _to_java_column is the convertor used.\r\n", | |
" return Column(addOneUDF.apply(_to_seq(spark, [a], _to_java_column)))\r\n", | |
"\r\n", | |
"# Finally we run our UDF against our data frame and create a new output\r\n", | |
"df = newdf.select(add_one_udf(\"a\").alias(\"aplus\"))\r\n", | |
"\r\n", | |
"df.show()" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cbattagler", | |
"session_id": "0", | |
"statement_id": 3, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-19T16:44:47.8497009Z", | |
"session_start_time": null, | |
"execution_start_time": "2022-12-19T16:49:02.0824743Z", | |
"execution_finish_time": "2022-12-19T16:49:03.199664Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cbattagler, 0, 3, Finished, Available)" | |
}, | |
"metadata": {} | |
}, | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"+-----+\n|aplus|\n+-----+\n| 2|\n| 3|\n| 4|\n| 5|\n| 6|\n| 7|\n| 8|\n+-----+\n\n" | |
] | |
} | |
], | |
"execution_count": 3, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
} | |
], | |
"metadata": { | |
"language_info": { | |
"name": "python" | |
}, | |
"kernelspec": { | |
"name": "synapse_pyspark", | |
"language": "Python", | |
"display_name": "Synapse PySpark" | |
}, | |
"kernel_info": { | |
"name": "synapse_pyspark" | |
}, | |
"save_output": true, | |
"synapse_widget": { | |
"version": "0.1", | |
"state": {} | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment