Skip to content

Instantly share code, notes, and snippets.

@sadikovi
Last active August 5, 2021 15:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sadikovi/b5aa436f0413014b5c4ae4f2adb44a8f to your computer and use it in GitHub Desktop.
Save sadikovi/b5aa436f0413014b5c4ae4f2adb44a8f to your computer and use it in GitHub Desktop.
Run SQL queries against a JDBC source in the notebook (for quick debugging, copy-paste the code, set url and props, and run queries)
object DB {
import org.apache.spark.sql._
import org.apache.spark.sql.types._
var url = "jdbc:sqlserver://..."
var props = new java.util.Properties()
var autoCommit = true
var spark = SparkSession.getActiveSession.get
def execute(conn: java.sql.Connection, query: String): DataFrame = {
val stmt = conn.prepareStatement(query)
try {
val buf = new java.util.ArrayList[Row]()
if (stmt.execute()) {
val rs = stmt.getResultSet()
val cols = rs.getMetaData.getColumnCount
val schema = org.apache.spark.sql.types.StructType((0 until cols).map { i =>
StructField(rs.getMetaData.getColumnName(i + 1), StringType)
})
while (rs.next()) {
// all values converted to strings for now
val values = (0 until cols).map { i => "" + rs.getObject(i + 1) }
buf.add(Row.apply(values: _*))
}
spark.createDataFrame(buf, schema)
} else {
buf.add(Row.apply("OK"))
val schema = StructType(StructField("value", StringType) :: Nil)
spark.createDataFrame(buf, schema)
}
} finally {
stmt.close()
}
}
def sql(query: String): DataFrame = {
val conn = java.sql.DriverManager.getConnection(DB.url, DB.props)
conn.setAutoCommit(DB.autoCommit)
try {
val ret = execute(conn, query)
if (!autoCommit) conn.commit()
ret
} catch {
case err if !autoCommit =>
conn.rollback()
throw err
} finally {
conn.close()
}
}
}
// Usage
DB.url = "jdbc:sqlserver://..."
DB.props.put("user", "ivan")
DB.props.put("password", "...")
display(DB.sql("create table ivan_test (col1 int, col2 int)"))
display(DB.sql("insert into ivan_test values (1, 2)"))
display(DB.sql("select * from ivan_test order by col1"))
display(DB.sql("drop table ivan_test"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment