Skip to content

Instantly share code, notes, and snippets.

@cbattlegear
Created December 9, 2022 17:23
Show Gist options
  • Save cbattlegear/30268a48b66168b1be48c9118a75894a to your computer and use it in GitHub Desktop.
Save cbattlegear/30268a48b66168b1be48c9118a75894a to your computer and use it in GitHub Desktop.
Using mssparkutil.fs.mount to read file
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"source": [
"# Get your ADLS linked service info quickly in the Synapse Workspace by\r\n",
"# going to Data -> Linked (tab) -> Azure Data Lake Storage Gen2 -> \r\n",
"# Hover over your ADLS account -> click the three dots -> Edit\r\n",
"# The pane that opens will have all the information you need to fill in below\r\n",
"mssparkutils.fs.mount( \r\n",
" \"abfss://cabattag@cabattagsyn.dfs.core.windows.net/\", # Your ADLS Gen2 account in container@account.fqdn format\r\n",
" \"/udffolder\", # The folder where you want the container mounted (with caveats shown below)\r\n",
" {\"linkedService\":\"cabattag-synapse-WorkspaceDefaultStorage\"} # The linked service that can access the ADLS account\r\n",
") \r\n",
"\r\n",
"# You can also use Shared Access Signatures: https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-file-mount-api#mount-via-shared-access-signature-token-or-account-key"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cabattagspark",
"session_id": "2",
"statement_id": 2,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-09T17:11:37.1620408Z",
"session_start_time": null,
"execution_start_time": "2022-12-09T17:11:37.2959011Z",
"execution_finish_time": "2022-12-09T17:11:58.452884Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cabattagspark, 2, 2, Finished, Available)"
},
"metadata": {}
},
{
"output_type": "execute_result",
"execution_count": 7,
"data": {
"text/plain": "True"
},
"metadata": {}
}
],
"execution_count": 2,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"The important part of this code is that we:\r\n",
"1. Append /synfs before the path\r\n",
"2. Get the job ID and also add it to the path\r\n",
"\r\n",
"The example file contains the string:\r\n",
"`thereareseveralwordsinthisdocumentcurrentlybutiamgoingtojustgrabafewcharacterstoshowhowtheudfworks`"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import udf\r\n",
"from pyspark.sql.types import StringType\r\n",
"\r\n",
"@udf(returnType=StringType()) \r\n",
"def getStringFromFile(start, end, filepath):\r\n",
" f = open(filepath,\"r\")\r\n",
" udftext = f.read()\r\n",
" f.close()\r\n",
" return udftext[start:end]"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cabattagspark",
"session_id": "2",
"statement_id": 9,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-09T17:17:44.8324248Z",
"session_start_time": null,
"execution_start_time": "2022-12-09T17:17:44.9866721Z",
"execution_finish_time": "2022-12-09T17:17:45.165198Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cabattagspark, 2, 9, Finished, Available)"
},
"metadata": {}
}
],
"execution_count": 9,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Make an example data frame\r\n",
"columns = [\"beginning\",\"end\"]\r\n",
"data = [(0, 5),\r\n",
" (8, 15),\r\n",
" (68, 78)]\r\n",
"\r\n",
"df = spark.createDataFrame(data=data,schema=columns)\r\n",
"df.show(truncate=False)"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cabattagspark",
"session_id": "2",
"statement_id": 13,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-09T17:18:50.8729117Z",
"session_start_time": null,
"execution_start_time": "2022-12-09T17:18:51.0191859Z",
"execution_finish_time": "2022-12-09T17:18:51.587048Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cabattagspark, 2, 13, Finished, Available)"
},
"metadata": {}
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---------+---+\n|beginning|end|\n+---------+---+\n|0 |5 |\n|8 |15 |\n|68 |78 |\n+---------+---+\n\n"
]
}
],
"execution_count": 13,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import col, lit\r\n",
"\r\n",
"jobId = mssparkutils.env.getJobId()\r\n",
"# Creating our file path in the format /synfs/{jobId}/{mountFolder}/{fileName}\r\n",
"filepath = \"/synfs/%s/udffolder/UDFTest/%s\" %(jobId, \"udfread.txt\")\r\n",
"\r\n",
"#Use the UDF from above to look up the characters in the file and add them as a column\r\n",
"df.withColumn(\"foundstring\", getStringFromFile(col(\"beginning\"), col(\"end\"), lit(filepath))) \\\r\n",
" .show(truncate=False)"
],
"outputs": [
{
"output_type": "display_data",
"data": {
"application/vnd.livy.statement-meta+json": {
"spark_pool": "cabattagspark",
"session_id": "2",
"statement_id": 14,
"state": "finished",
"livy_statement_state": "available",
"queued_time": "2022-12-09T17:18:55.7199648Z",
"session_start_time": null,
"execution_start_time": "2022-12-09T17:18:55.8515636Z",
"execution_finish_time": "2022-12-09T17:18:56.9680253Z",
"spark_jobs": null
},
"text/plain": "StatementMeta(cabattagspark, 2, 14, Finished, Available)"
},
"metadata": {}
},
{
"output_type": "stream",
"name": "stdout",
"text": [
"+---------+---+-----------+\n|beginning|end|foundstring|\n+---------+---+-----------+\n|0 |5 |there |\n|8 |15 |several |\n|68 |78 |characters |\n+---------+---+-----------+\n\n"
]
}
],
"execution_count": 14,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"language": "Python",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"kernel_info": {
"name": "synapse_pyspark"
},
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment