Skip to content

Instantly share code, notes, and snippets.

@orellabac
Created July 10, 2024 21:00
Show Gist options
  • Save orellabac/8e7fd90746ce664d9df2a14878a8a243 to your computer and use it in GitHub Desktop.
Save orellabac/8e7fd90746ce664d9df2a14878a8a243 to your computer and use it in GitHub Desktop.
Example of a function to run arbitrary scripts in snowpark from a notebook
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"kernelspec": {
"display_name": "Streamlit Notebook",
"name": "streamlit"
}
},
"nbformat_minor": 5,
"nbformat": 4,
"cells": [
{
"cell_type": "code",
"id": "dc6f824f-30bb-49c0-b072-300f25c6e581",
"metadata": {
"language": "sql",
"name": "creating_a_sample_file",
"collapsed": false
},
"outputs": [],
"source": "\nCOPY INTO @MYSTAGE/APP.py\nFROM (\nSELECT $$\nimport sys\nfrom snowflake.snowpark.context import get_active_session \nsession = get_active_session() \nsession.sql(f\"create or replace table mytest as select 1 A, 2 B, '{sys.argv[1]}' C\").show()\n$$\n)\nFILE_FORMAT=(\nTYPE=CSV\nCOMPRESSION=NONE\nRECORD_DELIMITER=NONE\nFIELD_DELIMITER=NONE\n)\nOVERWRITE=TRUE\nSINGLE=TRUE;",
"execution_count": null
},
{
"cell_type": "code",
"id": "76bda2b5-44de-487c-9a91-086e00a308d4",
"metadata": {
"language": "sql",
"name": "execute_immediate_py",
"collapsed": false
},
"outputs": [],
"source": "CREATE OR REPLACE PROCEDURE EXECUTE_IMMEDIATE_PY(FILE STRING, ARGS ARRAY DEFAULT NULL, packages ARRAY DEFAULT NULL, SYNC BOOLEAN DEFAULT TRUE) RETURNS VARCHAR LANGUAGE PYTHON\nRUNTIME_VERSION =3.11\npackages=('snowflake-snowpark-python')\nhandler='main'\nas\n$$\nimport sys,os,json\nfrom snowflake.snowpark import session\ndef main(session,file,args,packages, sync):\n simple_file_name = os.path.basename(file)\n args = args if args else []\n args = json.dumps(args)\n packages = list(set(packages + ['snowflake-snowpark-python'])) if packages else ['snowflake-snowpark-python','pandas','numpy']\n packages = [f\"'{x}'\" for x in packages]\n DOLLAR = \"$\"\n template = f\"\"\"\nwith anonymous1 as procedure() returns varchar language python \nruntime_version = 3.11\npackages=({\",\".join(packages)})\nimports=('{file}')\nhandler = 'main'\nas {DOLLAR}{DOLLAR}\nfrom snowflake.snowpark import session\nimport sys, os, importlib, types, logging\nimport importlib.machinery\ndef load_and_run_script(script_path):\n IMPORT_DIRECTORY_NAME = \"snowflake_import_directory\"\n import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]\n loader = importlib.machinery.SourceFileLoader(\"dynamic_script\", os.path.join(import_dir,script_path))\n mod = types.ModuleType(loader.name)\n loader.exec_module(mod)\ndef main(session):\n sys.argv = {args}\n logging.info(f\"Running script {file}\")\n load_and_run_script(\"{simple_file_name}\")\n return \"done\"\n{DOLLAR}{DOLLAR}\ncall anonymous1();\n\"\"\"\n if sync:\n return str(session.sql(template).collect())\n else:\n async_job = session.sql(template).collect_nowait()\n return async_job.query_id\n$$;\n\n",
"execution_count": null
},
{
"cell_type": "code",
"id": "90a1fe4e-313d-4e4d-94a0-2f4457b77dde",
"metadata": {
"language": "sql",
"name": "execute_sync",
"collapsed": false
},
"outputs": [],
"source": "call EXECUTE_IMMEDIATE_PY('@MYSTAGE/APP.py',['arg1','arg2','arg3'])",
"execution_count": null
},
{
"cell_type": "code",
"id": "ce7d8c54-b9e4-47c0-b9f2-a8ed83201d6a",
"metadata": {
"language": "sql",
"name": "execute_async",
"collapsed": false
},
"outputs": [],
"source": "call EXECUTE_IMMEDIATE_PY(FILE=>'@MYSTAGE/APP.py',ARGS=>['arg1','arg2','arg3'],sync=>FALSE)",
"execution_count": null
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment