Last active
November 13, 2019 23:50
-
-
Save erikerlandson/755848b2baf2ea34c98a1f4292a7567e to your computer and use it in GitHub Desktop.
Accessing SQL databases via JDBC, using spark, from a cloud-resident jupyter notebook
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# SQL JDBC drivers can be downloaded from maven coords, if they are exposed that way. | |
# Otherwise they may be installed as naked jars and added to the path | |
import os | |
os.environ['PYSPARK_SUBMIT_ARGS'] = \ | |
# any other pyspark-shell args as well ... \ | |
"""--packages 'org.postgresql:postgresql:42.2.8,com.ibm.db2.jcc:db2jcc:db2jcc4' pyspark-shell""" | |
# attach to the spark cluster | |
spark = SparkSession.builder.master("spark://spark-cluster-poweruser:7077").getOrCreate() | |
################################################ | |
# First example is a DB2 database | |
# some auth params for a DB2 database: | |
DB2_ADDRESS = 'db-host-name' | |
DB2_PORT = '51000' | |
DB2_USERNAME = 'user' | |
DB2_PASSWORD = 'password' | |
DB2_DBNAME = 'db-name' | |
# DB2 connection specs generated from above: | |
db2_driver = 'com.ibm.db2.jcc.DB2Driver' | |
db2_jdbc_str = ('jdbc:db2://{host}:{port}/{database}').format(host=DB2_ADDRESS, | |
port=DB2_PORT, | |
database=DB2_DBNAME) | |
db2_prop = {"user":DB2_USERNAME, "password":DB2_PASSWORD, "driver":db2_driver, "sslConnection":"false"} | |
# If you want parallelized/partitioned JDBC queries, you can do it via setting predicates. | |
# This example function creates n partitions by hashing `db_column_name`. | |
# note: your base query will also need to select `db_column_name` as one of the fields for this to work | |
def qpreds(n): | |
return ["DBMS_UTILITY.GET_HASH_VALUE(db_column_name, 1, {np}) = {modulus}".format(np=n, modulus=k+1) for k in range(n)] | |
# Here is a query against a DB2 database. | |
# Note it generates parallel partitions using `qpreds` above | |
# Also, the `FETCH FIRST...` clause is like postgresql `LIMIT` for DB2 | |
# This table query is pushed onto the DB server, and has to be attached to a temp table, `tmp1` here: | |
spark_dataframe = spark.read.jdbc( \ | |
table='(SELECT db_col_1, db_col_2, db_col_3 FROM db_table FETCH FIRST 100000 ROWS ONLY) tmp1', \ | |
url=db2_jdbc_str, \ | |
properties=db2_prop, \ | |
predicates=qpreds(24)) | |
##################################### | |
# next is a postgresql example | |
urlPSQL = ("jdbc:postgresql://{host}:{port}/{db}").format( \ | |
host='postgresql', \ | |
port='5432', \ | |
db='testdb') | |
propertiesPSQL = { \ | |
"driver": "org.postgresql.Driver", | |
"user": 'test', | |
"password": 'test' | |
} | |
psql_dataframe = spark.read.jdbc(url=urlPSQL, \ | |
properties=propertiesPSQL, \ | |
table="db_table_name") | |
# An example of writing back into a DB (this one is postgresql) | |
spark_df.write.jdbc(table='output_table_name', mode='overwrite', \ | |
url=urlPSQL, \ | |
properties=propertiesPSQL) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment