Skip to content

Instantly share code, notes, and snippets.

@orellabac
Created January 12, 2024 04:34
Show Gist options
  • Save orellabac/0755a8ad079e7c576d0fae98139cca21 to your computer and use it in GitHub Desktop.
Save orellabac/0755a8ad079e7c576d0fae98139cca21 to your computer and use it in GitHub Desktop.
Loading a whole table from DBX to snowflake using jdbc
create or replace procedure ingest_table_from_dbx(
username string,password string, host string, qualified_table_name string, page_size int default 5000, max_rows int default -1)
returns string language python runtime_version = 3.11
packages =('snowflake-snowpark-python')
handler = 'main' execute as caller as $$
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col, lit, object_construct
def main(session: snowpark.Session, username, password, host, qualified_table_name, page_size, max_rows):
# connection_settings
conn_settings = object_construct(
lit('driver'), lit('com.databricks.client.jdbc.Driver'),
lit('username'),lit(username),
lit('password'),lit(password),
lit('url'), lit(f'jdbc:databricks://{host}:443;HttpPath=/sql/1.0/warehouses/f5023530eac5aaa9'))
# do an initial list to create a dummy json file
offset=0
keep=True
table_name = qualified_table_name.split(".")[-1]
# clear any existing files from the user stage
session.sql("rm @~/{table_name}/").show()
# loop donwload data into files in an stage
while keep:
query = f'SELECT * FROM {qualified_table_name} limit {page_size} offset {offset}'
copy_result = session.table_function("READ_JDBC",conn_settings, lit(query))\
.write.copy_into_location(f"@~/{table_name}/{table_name}_{offset}.json",
format_type_options={'COMPRESSION':'NONE'},file_format_type="json", overwrite=True)
if hasattr(copy_result[0],"rows_unloaded") and copy_result[0].rows_unloaded > 0:
keep = True
offset = offset + copy_result[0].rows_unloaded
else:
keep = False
if max_rows > 0 and offset >= max_rows:
keep = False
# now load all files, and purge them after ingestion
session.read\
.option("INFER_SCHEMA",True)\
.option("PURGE",True)\
.json(f"@~/{table_name}/")\
.write.save_as_table(table_name,mode="overwrite")
return f"Data written into {table_name}"
$$;
call ingest_table_from_dbx(
username => 'mauricio.rojas@xxxx',
password => 'dapixxxx-3',
host => 'adb-xxxx.azuredatabricks.net',
qualified_table_name => 'samples.tpch.customer',
page_size => 10000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment