Skip to content

Instantly share code, notes, and snippets.

@seralf
Last active February 28, 2023 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seralf/808c9dbe1d05ebfcd9db0d672a64817c to your computer and use it in GitHub Desktop.
Save seralf/808c9dbe1d05ebfcd9db0d672a64817c to your computer and use it in GitHub Desktop.
Naive Mock for JDBC in scala
package examples
import java.sql.DriverManager
import db.jdbc.JDBC
import com.typesafe.config.ConfigFactory
object MainIgnite extends App {
// SEE: https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteJdbcThinDriver.html
val db = JDBC("""
jdbc {
driver: "org.apache.ignite.IgniteJdbcThinDriver"
dsn: "jdbc:ignite:thin://localhost/"
}
""")
db.execute_update(QUERY.q_ddl)
// generate some test data...
for (i <- 0 to 10000) {
val j = i + 10
val q = s"""
INSERT INTO City (id, name) VALUES (${j}, 'City n° ${i}') ;
INSERT INTO Person (id, name, city_id) VALUES (1, 'Person n° ${i}', ${j}) ;
"""
println("SQL> insert " + i)
db.execute_update(q)
}
db.execute_query(QUERY.q_example_01, false).get
.zipWithIndex
.foreach {
case (item, i) =>
println(i, item)
}
db.stop()
}
object QUERY {
def q_ddl = """
DROP TABLE IF EXISTS City
;
CREATE TABLE IF NOT EXISTS City (
id LONG PRIMARY KEY,
name VARCHAR
) WITH "template=replicated"
;
DROP TABLE IF EXISTS Person
;
CREATE TABLE IF NOT EXISTS Person (
id LONG,
name VARCHAR,
city_id LONG,
PRIMARY KEY (id, city_id)
) WITH "backups=1, affinityKey=city_id"
;
DROP INDEX IF EXISTS idx_city_name
;
CREATE INDEX IF NOT EXISTS idx_city_name ON City (name)
;
DROP INDEX IF EXISTS idx_person_name
;
CREATE INDEX idx_person_name ON Person (name)
;
----
INSERT INTO City (id, name) VALUES (1, 'Forest Hill');
INSERT INTO City (id, name) VALUES (2, 'Denver');
INSERT INTO City (id, name) VALUES (3, 'St. Petersburg');
INSERT INTO Person (id, name, city_id) VALUES (1, 'John Doe', 3);
INSERT INTO Person (id, name, city_id) VALUES (2, 'Jane Roe', 2);
INSERT INTO Person (id, name, city_id) VALUES (3, 'Mary Major', 1);
INSERT INTO Person (id, name, city_id) VALUES (4, 'Richard Miles', 2);
"""
def q_example_01 = """
SELECT P.name AS person_name, C.name AS city_name
FROM Person AS P
JOIN City AS C
ON ( P.city_id = C.id )
"""
}
package db.jdbc
import com.typesafe.config.Config
import java.sql.DriverManager
import com.typesafe.config.ConfigFactory
import scala.util.Random
import java.sql.Connection
import scala.util.Try
import scala.collection.mutable.LinkedHashMap
import scala.collection.mutable.LinkedList
import scala.collection.mutable.ListBuffer
import java.sql.ResultSet
import java.sql.Statement
import org.slf4j.LoggerFactory
object JDBC {
def apply(config_txt: String) = {
val db = new JDBC(ConfigFactory.parseString(config_txt))
db.start()
db
}
}
class JDBC(config: Config) {
val logger = LoggerFactory.getLogger(this.getClass)
val conf = config.withFallback(ConfigFactory.parseString("""
jdbc {
user: ""
pwd: ""
pool: 1
}
""")).resolve()
Class.forName(conf.getString("jdbc.driver"))
private var conns: Seq[Connection] = Seq()
def conn = Random.shuffle(conns).head
def start() {
if (conns.isEmpty)
conns = for (i <- 0 to conf.getInt("jdbc.pool") - 1) yield DriverManager.getConnection(conf.getString("jdbc.dsn"))
logger.debug(s"initialized ${conf.getInt("jdbc.pool")} connections")
}
def stop() {
if (!conns.isEmpty)
conns.foreach(_.close())
logger.debug(s"closed ${conf.getInt("jdbc.pool")} connections")
}
def execute_update(query: String): Try[Any] = Try {
val st = this.conn.createStatement()
val results = st.executeUpdate(query)
st.closeOnCompletion()
results
}
def execute_query(query: String, use_stream: Boolean = false): Try[Seq[Map[String, Any]]] = Try {
import utils.wrappers._
implicit val conn = this.conn
usingQuery(query, use_stream) { rs =>
val meta = rs.getMetaData
val cols = for (n <- 1 to meta.getColumnCount) yield meta.getColumnName(n)
def it = new Iterator[ResultSet] {
def has_next() = rs.next()
def hasNext: Boolean = has_next()
def next(): ResultSet = rs
}
.zipWithIndex
.map {
case (rset, i) =>
logger.debug(s"SQL> item $i")
LinkedHashMap(cols.map(n => (n, rset.getObject(n))): _*).toMap
}.toSeq
if (use_stream)
it.toStream
else
it.toList
}
}
}
package utils {
object wrappers {
implicit class ResultSetWrapper(resultset: ResultSet) {
def toStream() = {
}
}
def using[C <: { def close(): Unit }, OUT](resource: C)(action_on_resource: C => OUT): OUT =
try {
action_on_resource(resource)
} finally {
resource.close()
}
def usingSQLConnection[C <: Connection, OUT](conn: C)(action_on_connection: C => OUT): OUT =
try {
action_on_connection(conn)
} finally {
println("SQL> connection - close")
conn.close()
}
def usingStatement[C <: Statement, OUT](statement: C, use_stream: Boolean = false)(action_on_statement: C => OUT): OUT = {
try {
action_on_statement(statement)
} finally {
println("SQL> statement - close")
if (!use_stream) statement.close()
}
}
def usingResultSet[RS <: ResultSet, OUT](rs: RS, use_stream: Boolean = false)(action_on_resultset: RS => OUT): OUT = {
try {
action_on_resultset(rs)
} finally {
println("SQL> resultset - close")
if (!use_stream) rs.close()
}
}
def usingQuery[C <: Connection, OUT](query: String, use_stream: Boolean = false)(action_on_resultset: ResultSet => OUT)(implicit conn: C): OUT = {
usingStatement(conn.createStatement(), use_stream) { st =>
usingResultSet(st.executeQuery(query), use_stream) { rs =>
action_on_resultset(rs)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment