Skip to content

Instantly share code, notes, and snippets.

@asardaes
Created August 8, 2018 11:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asardaes/8331a117210d4e08139c66c86e8c952d to your computer and use it in GitHub Desktop.
Save asardaes/8331a117210d4e08139c66c86e8c952d to your computer and use it in GitHub Desktop.
[Flink][Scala] JDBCInputFormat with SplitDataProperties
{
"nodes": [
{
"id": 4,
"type": "source",
"pact": "Data Source",
"contents": "at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)",
"parallelism": "24",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 3,
"type": "pact",
"pact": "GroupCombine",
"contents": "GroupCombine at hello.flink.scala.Main$.main(Main.scala:56)",
"parallelism": "24",
"predecessors": [
{"id": 4, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
],
"driver_strategy": "Sorted Combine",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 2,
"type": "pact",
"pact": "GroupCombine",
"contents": "SUM(3)",
"parallelism": "24",
"predecessors": [
{"id": 3, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
],
"driver_strategy": "Sorted Combine",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 1,
"type": "pact",
"pact": "GroupReduce",
"contents": "SUM(3)",
"parallelism": "24",
"predecessors": [
{"id": 2, "ship_strategy": "Hash Partition on [0, 1, 2]", "local_strategy": "Sort (combining) on [0:ASC,1:ASC,2:ASC]", "exchange_mode": "PIPELINED"}
],
"driver_strategy": "Sorted Group Reduce",
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0, 1, 2]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC,1:ASC,2:ASC]" },
{ "name": "Grouped on", "value": "[0, 1, 2]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 0,
"type": "sink",
"pact": "Data Sink",
"contents": "hello.flink.scala.Main$$anon$3@226a82c4",
"parallelism": "24",
"predecessors": [
{"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"}
],
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0, 1, 2]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC,1:ASC,2:ASC]" },
{ "name": "Grouped on", "value": "[0, 1, 2]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
}
]
}
private def getInputFormat(query: String, dbUrl: String, properties: Properties): JDBCInputFormat = {
val fieldTypes = Seq(
BasicTypeInfo.STRING_TYPE_INFO,
SqlTimeTypeInfo.TIMESTAMP,
BasicTypeInfo.LONG_TYPE_INFO
)
val rowTypeInfo = new RowTypeInfo(fieldTypes: _*)
val inputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.vertica.jdbc.Driver")
.setDBUrl(dbUrl)
.setUsername(properties.getProperty("user"))
.setPassword(properties.getProperty("password"))
.setQuery(query)
.setRowTypeInfo(rowTypeInfo)
.setParametersProvider(getParameterValuesProvider)
.finish()
val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, "example")
val sdp = dataSource.getSplitDataProperties
sdp.splitsPartitionedBy(0)
sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING))
inputFormat
}
val query = "SELECT DISTINCT\n" +
" hostname,\n" +
s" TIME_SLICE(timestamp, $windowSize)::TIMESTAMPTZ AS ts,\n" +
" id\n" +
"FROM\n" +
" my_table\n" +
"WHERE\n" +
s" timestamp BETWEEN $dateBetween\n" +
" AND\n" +
" hostname = ?\n" +
"ORDER BY\n" +
" ts ASC\n"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment