Last active
October 29, 2019 08:52
-
-
Save htjelsma/5304e22eda00b5360941e9dec9b55899 to your computer and use it in GitHub Desktop.
custom_sql_in_spark.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Set up a JDBC connection to your database | |
#Get the credentials from the Keyvault that a scope was defined for using the | |
#Databricks CLI. See: https://docs.azuredatabricks.net/security/secrets/secret-scopes.html | |
#alternatively, hard-code the credentials here: | |
jdbcUrl = dbutils.secrets.get(scope="JDBC", key="url") | |
jdbcUsername = dbutils.secrets.get(scope="JDBC", key="username") | |
jdbcPassword = dbutils.secrets.get(scope="JDBC", key="password") | |
#create an object with the credentials that we can pass on to the spark dataframe reader | |
connectionProperties = { | |
"user" : jdbcUsername, | |
"password" : jdbcPassword, | |
} | |
#Optional: create a dataset | |
import json | |
import requests | |
import pandas as pd | |
response = requests.get("http://api.open-notify.org/astros.json") | |
#status_code = response.status_code | |
data = response.json() | |
df = pd.DataFrame(data["people"]) | |
spark_df = spark.createDataFrame(df) | |
spark_df.write.jdbc(url=jdbcUrl, table='PeopleInSpace',mode='overwrite',properties=connectionProperties) | |
display(spark_df) | |
#Add a sessionInitStatement to your connection properties | |
connectionProperties['sessionInitStatement'] = "DELETE FROM PeopleInSpace WHERE name='Alexander Skvortsov';" | |
#Force a spark read | |
#We are reading the result of a pushdown query here, which returns the error code of the session | |
#I expect it to always return 0, because if there is an error, we get s Spark exception, and not the result of the query | |
rc = spark.read.jdbc(url=jdbcUrl, table='(select 1 as nr) as t', properties=connectionProperties).count() | |
spark_df_after_delete = spark.read.jdbc(url=jdbcUrl, table='PeopleInSpace',properties=connectionProperties) | |
display(spark_df_after_delete) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment