Created
January 12, 2024 04:34
-
-
Save orellabac/0755a8ad079e7c576d0fae98139cca21 to your computer and use it in GitHub Desktop.
Loading a whole table from DBX to snowflake using jdbc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create or replace procedure ingest_table_from_dbx( | |
username string,password string, host string, qualified_table_name string, page_size int default 5000, max_rows int default -1) | |
returns string language python runtime_version = 3.11 | |
packages =('snowflake-snowpark-python') | |
handler = 'main' execute as caller as $$ | |
import snowflake.snowpark as snowpark | |
from snowflake.snowpark.functions import col, lit, object_construct | |
def main(session: snowpark.Session, username, password, host, qualified_table_name, page_size, max_rows): | |
# connection_settings | |
conn_settings = object_construct( | |
lit('driver'), lit('com.databricks.client.jdbc.Driver'), | |
lit('username'),lit(username), | |
lit('password'),lit(password), | |
lit('url'), lit(f'jdbc:databricks://{host}:443;HttpPath=/sql/1.0/warehouses/f5023530eac5aaa9')) | |
# do an initial list to create a dummy json file | |
offset=0 | |
keep=True | |
table_name = qualified_table_name.split(".")[-1] | |
# clear any existing files from the user stage | |
session.sql("rm @~/{table_name}/").show() | |
# loop donwload data into files in an stage | |
while keep: | |
query = f'SELECT * FROM {qualified_table_name} limit {page_size} offset {offset}' | |
copy_result = session.table_function("READ_JDBC",conn_settings, lit(query))\ | |
.write.copy_into_location(f"@~/{table_name}/{table_name}_{offset}.json", | |
format_type_options={'COMPRESSION':'NONE'},file_format_type="json", overwrite=True) | |
if hasattr(copy_result[0],"rows_unloaded") and copy_result[0].rows_unloaded > 0: | |
keep = True | |
offset = offset + copy_result[0].rows_unloaded | |
else: | |
keep = False | |
if max_rows > 0 and offset >= max_rows: | |
keep = False | |
# now load all files, and purge them after ingestion | |
session.read\ | |
.option("INFER_SCHEMA",True)\ | |
.option("PURGE",True)\ | |
.json(f"@~/{table_name}/")\ | |
.write.save_as_table(table_name,mode="overwrite") | |
return f"Data written into {table_name}" | |
$$; | |
call ingest_table_from_dbx( | |
username => 'mauricio.rojas@xxxx', | |
password => 'dapixxxx-3', | |
host => 'adb-xxxx.azuredatabricks.net', | |
qualified_table_name => 'samples.tpch.customer', | |
page_size => 10000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment