Skip to content

Instantly share code, notes, and snippets.

@zeitgeist
Created June 22, 2016 22:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeitgeist/b91a60460661618ca4585e082895c616 to your computer and use it in GitHub Desktop.
Save zeitgeist/b91a60460661618ca4585e082895c616 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.io.RichInputFormat
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.scala._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.table.Row
import org.apache.flink.api.table.typeutils.RowTypeInfo
import org.apache.flink.core.io.InputSplit
/**
* Created by ms on 21/06/16.
*/
object JDBCIssue {
val DB_DRIVER = "org.postgresql.Driver"
val DB_URI = "jdbc:postgresql://localhost:5432/postgres"
val DB_USER = "postgres"
val DB_PASS = ""
val DB_QUERY = "SELECT id FROM events LIMIT 10;"
val DB_ROWTYPE = new RowTypeInfo(
Seq(BasicTypeInfo.INT_TYPE_INFO),
Seq("id"))
def jdbcIssue = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DB_DRIVER)
.setDBUrl(DB_URI)
.setUsername(DB_USER)
.setPassword(DB_PASS)
.setQuery(DB_QUERY)
.setRowTypeInfo(DB_ROWTYPE)
.finish()
class MyRow extends Row(1)
def jdbcNoIssue =
jdbcIssue
.asInstanceOf[RichInputFormat[MyRow, InputSplit] with ResultTypeQueryable[Row]]
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.createInput(jdbcIssue).print()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment