Skip to content

Instantly share code, notes, and snippets.

@erikerlandson
Last active November 13, 2019 23:50
Show Gist options
  • Save erikerlandson/755848b2baf2ea34c98a1f4292a7567e to your computer and use it in GitHub Desktop.
Save erikerlandson/755848b2baf2ea34c98a1f4292a7567e to your computer and use it in GitHub Desktop.
Accessing SQL databases via JDBC, using spark, from a cloud-resident jupyter notebook
# SQL JDBC drivers can be downloaded from maven coords, if they are exposed that way.
# Otherwise they may be installed as naked jars and added to the path
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = \
# any other pyspark-shell args as well ... \
"""--packages 'org.postgresql:postgresql:42.2.8,com.ibm.db2.jcc:db2jcc:db2jcc4' pyspark-shell"""
# attach to the spark cluster
spark = SparkSession.builder.master("spark://spark-cluster-poweruser:7077").getOrCreate()
################################################
# First example is a DB2 database
# some auth params for a DB2 database:
DB2_ADDRESS = 'db-host-name'
DB2_PORT = '51000'
DB2_USERNAME = 'user'
DB2_PASSWORD = 'password'
DB2_DBNAME = 'db-name'
# DB2 connection specs generated from above:
db2_driver = 'com.ibm.db2.jcc.DB2Driver'
db2_jdbc_str = ('jdbc:db2://{host}:{port}/{database}').format(host=DB2_ADDRESS,
port=DB2_PORT,
database=DB2_DBNAME)
db2_prop = {"user":DB2_USERNAME, "password":DB2_PASSWORD, "driver":db2_driver, "sslConnection":"false"}
# If you want parallelized/partitioned JDBC queries, you can do it via setting predicates.
# This example function creates n partitions by hashing `db_column_name`.
# note: your base query will also need to select `db_column_name` as one of the fields for this to work
def qpreds(n):
return ["DBMS_UTILITY.GET_HASH_VALUE(db_column_name, 1, {np}) = {modulus}".format(np=n, modulus=k+1) for k in range(n)]
# Here is a query against a DB2 database.
# Note it generates parallel partitions using `qpreds` above
# Also, the `FETCH FIRST...` clause is like postgresql `LIMIT` for DB2
# This table query is pushed onto the DB server, and has to be attached to a temp table, `tmp1` here:
spark_dataframe = spark.read.jdbc( \
table='(SELECT db_col_1, db_col_2, db_col_3 FROM db_table FETCH FIRST 100000 ROWS ONLY) tmp1', \
url=db2_jdbc_str, \
properties=db2_prop, \
predicates=qpreds(24))
#####################################
# next is a postgresql example
urlPSQL = ("jdbc:postgresql://{host}:{port}/{db}").format( \
host='postgresql', \
port='5432', \
db='testdb')
propertiesPSQL = { \
"driver": "org.postgresql.Driver",
"user": 'test',
"password": 'test'
}
psql_dataframe = spark.read.jdbc(url=urlPSQL, \
properties=propertiesPSQL, \
table="db_table_name")
# An example of writing back into a DB (this one is postgresql)
spark_df.write.jdbc(table='output_table_name', mode='overwrite', \
url=urlPSQL, \
properties=propertiesPSQL)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment