Created
December 9, 2022 17:23
-
-
Save cbattlegear/30268a48b66168b1be48c9118a75894a to your computer and use it in GitHub Desktop.
Using mssparkutil.fs.mount to read file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# Get your ADLS linked service info quickly in the Synapse Workspace by\r\n", | |
"# going to Data -> Linked (tab) -> Azure Data Lake Storage Gen2 -> \r\n", | |
"# Hover over your ADLS account -> click the three dots -> Edit\r\n", | |
"# The pane that opens will have all the information you need to fill in below\r\n", | |
"mssparkutils.fs.mount( \r\n", | |
" \"abfss://cabattag@cabattagsyn.dfs.core.windows.net/\", # Your ADLS Gen2 account in container@account.fqdn format\r\n", | |
" \"/udffolder\", # The folder where you want the container mounted (with caveats shown below)\r\n", | |
" {\"linkedService\":\"cabattag-synapse-WorkspaceDefaultStorage\"} # The linked service that can access the ADLS account\r\n", | |
") \r\n", | |
"\r\n", | |
"# You can also use Shared Access Signatures: https://learn.microsoft.com/en-us/azure/synapse-analytics/spark/synapse-file-mount-api#mount-via-shared-access-signature-token-or-account-key" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cabattagspark", | |
"session_id": "2", | |
"statement_id": 2, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-09T17:11:37.1620408Z", | |
"session_start_time": null, | |
"execution_start_time": "2022-12-09T17:11:37.2959011Z", | |
"execution_finish_time": "2022-12-09T17:11:58.452884Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cabattagspark, 2, 2, Finished, Available)" | |
}, | |
"metadata": {} | |
}, | |
{ | |
"output_type": "execute_result", | |
"execution_count": 7, | |
"data": { | |
"text/plain": "True" | |
}, | |
"metadata": {} | |
} | |
], | |
"execution_count": 2, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"The important part of this code is that we:\r\n", | |
"1. Append /synfs before the path\r\n", | |
"2. Get the job ID and also add it to the path\r\n", | |
"\r\n", | |
"The example file contains the string:\r\n", | |
"`thereareseveralwordsinthisdocumentcurrentlybutiamgoingtojustgrabafewcharacterstoshowhowtheudfworks`" | |
], | |
"metadata": { | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"from pyspark.sql.functions import udf\r\n", | |
"from pyspark.sql.types import StringType\r\n", | |
"\r\n", | |
"@udf(returnType=StringType()) \r\n", | |
"def getStringFromFile(start, end, filepath):\r\n", | |
" f = open(filepath,\"r\")\r\n", | |
" udftext = f.read()\r\n", | |
" f.close()\r\n", | |
" return udftext[start:end]" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cabattagspark", | |
"session_id": "2", | |
"statement_id": 9, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-09T17:17:44.8324248Z", | |
"session_start_time": null, | |
"execution_start_time": "2022-12-09T17:17:44.9866721Z", | |
"execution_finish_time": "2022-12-09T17:17:45.165198Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cabattagspark, 2, 9, Finished, Available)" | |
}, | |
"metadata": {} | |
} | |
], | |
"execution_count": 9, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# Make an example data frame\r\n", | |
"columns = [\"beginning\",\"end\"]\r\n", | |
"data = [(0, 5),\r\n", | |
" (8, 15),\r\n", | |
" (68, 78)]\r\n", | |
"\r\n", | |
"df = spark.createDataFrame(data=data,schema=columns)\r\n", | |
"df.show(truncate=False)" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cabattagspark", | |
"session_id": "2", | |
"statement_id": 13, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-09T17:18:50.8729117Z", | |
"session_start_time": null, | |
"execution_start_time": "2022-12-09T17:18:51.0191859Z", | |
"execution_finish_time": "2022-12-09T17:18:51.587048Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cabattagspark, 2, 13, Finished, Available)" | |
}, | |
"metadata": {} | |
}, | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"+---------+---+\n|beginning|end|\n+---------+---+\n|0 |5 |\n|8 |15 |\n|68 |78 |\n+---------+---+\n\n" | |
] | |
} | |
], | |
"execution_count": 13, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"from pyspark.sql.functions import col, lit\r\n", | |
"\r\n", | |
"jobId = mssparkutils.env.getJobId()\r\n", | |
"# Creating our file path in the format /synfs/{jobId}/{mountFolder}/{fileName}\r\n", | |
"filepath = \"/synfs/%s/udffolder/UDFTest/%s\" %(jobId, \"udfread.txt\")\r\n", | |
"\r\n", | |
"#Use the UDF from above to look up the characters in the file and add them as a column\r\n", | |
"df.withColumn(\"foundstring\", getStringFromFile(col(\"beginning\"), col(\"end\"), lit(filepath))) \\\r\n", | |
" .show(truncate=False)" | |
], | |
"outputs": [ | |
{ | |
"output_type": "display_data", | |
"data": { | |
"application/vnd.livy.statement-meta+json": { | |
"spark_pool": "cabattagspark", | |
"session_id": "2", | |
"statement_id": 14, | |
"state": "finished", | |
"livy_statement_state": "available", | |
"queued_time": "2022-12-09T17:18:55.7199648Z", | |
"session_start_time": null, | |
"execution_start_time": "2022-12-09T17:18:55.8515636Z", | |
"execution_finish_time": "2022-12-09T17:18:56.9680253Z", | |
"spark_jobs": null | |
}, | |
"text/plain": "StatementMeta(cabattagspark, 2, 14, Finished, Available)" | |
}, | |
"metadata": {} | |
}, | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"+---------+---+-----------+\n|beginning|end|foundstring|\n+---------+---+-----------+\n|0 |5 |there |\n|8 |15 |several |\n|68 |78 |characters |\n+---------+---+-----------+\n\n" | |
] | |
} | |
], | |
"execution_count": 14, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"name": "synapse_pyspark", | |
"language": "Python", | |
"display_name": "Synapse PySpark" | |
}, | |
"language_info": { | |
"name": "python" | |
}, | |
"kernel_info": { | |
"name": "synapse_pyspark" | |
}, | |
"save_output": true, | |
"synapse_widget": { | |
"version": "0.1", | |
"state": {} | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment