Created
August 8, 2018 11:59
-
-
Save asardaes/8331a117210d4e08139c66c86e8c952d to your computer and use it in GitHub Desktop.
[Flink][Scala] JDBCInputFormat with SplitDataProperties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"nodes": [ | |
{ | |
"id": 4, | |
"type": "source", | |
"pact": "Data Source", | |
"contents": "at org.apache.flink.api.scala.ExecutionEnvironment.createInput(ExecutionEnvironment.scala:390) (org.apache.flink.api.java.io.jdbc.JDBCInputFormat)", | |
"parallelism": "24", | |
"global_properties": [ | |
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, | |
{ "name": "Partitioning Order", "value": "(none)" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"local_properties": [ | |
{ "name": "Order", "value": "(none)" }, | |
{ "name": "Grouping", "value": "not grouped" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"estimates": [ | |
{ "name": "Est. Output Size", "value": "(unknown)" }, | |
{ "name": "Est. Cardinality", "value": "(unknown)" } ], | |
"costs": [ | |
{ "name": "Network", "value": "0.0" }, | |
{ "name": "Disk I/O", "value": "0.0" }, | |
{ "name": "CPU", "value": "0.0" }, | |
{ "name": "Cumulative Network", "value": "0.0" }, | |
{ "name": "Cumulative Disk I/O", "value": "0.0" }, | |
{ "name": "Cumulative CPU", "value": "0.0" } | |
], | |
"compiler_hints": [ | |
{ "name": "Output Size (bytes)", "value": "(none)" }, | |
{ "name": "Output Cardinality", "value": "(none)" }, | |
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, | |
{ "name": "Filter Factor", "value": "(none)" } ] | |
}, | |
{ | |
"id": 3, | |
"type": "pact", | |
"pact": "GroupCombine", | |
"contents": "GroupCombine at hello.flink.scala.Main$.main(Main.scala:56)", | |
"parallelism": "24", | |
"predecessors": [ | |
{"id": 4, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} | |
], | |
"driver_strategy": "Sorted Combine", | |
"global_properties": [ | |
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, | |
{ "name": "Partitioning Order", "value": "(none)" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"local_properties": [ | |
{ "name": "Order", "value": "(none)" }, | |
{ "name": "Grouping", "value": "not grouped" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"estimates": [ | |
{ "name": "Est. Output Size", "value": "(unknown)" }, | |
{ "name": "Est. Cardinality", "value": "(unknown)" } ], | |
"costs": [ | |
{ "name": "Network", "value": "0.0" }, | |
{ "name": "Disk I/O", "value": "0.0" }, | |
{ "name": "CPU", "value": "0.0" }, | |
{ "name": "Cumulative Network", "value": "0.0" }, | |
{ "name": "Cumulative Disk I/O", "value": "0.0" }, | |
{ "name": "Cumulative CPU", "value": "0.0" } | |
], | |
"compiler_hints": [ | |
{ "name": "Output Size (bytes)", "value": "(none)" }, | |
{ "name": "Output Cardinality", "value": "(none)" }, | |
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, | |
{ "name": "Filter Factor", "value": "(none)" } ] | |
}, | |
{ | |
"id": 2, | |
"type": "pact", | |
"pact": "GroupCombine", | |
"contents": "SUM(3)", | |
"parallelism": "24", | |
"predecessors": [ | |
{"id": 3, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} | |
], | |
"driver_strategy": "Sorted Combine", | |
"global_properties": [ | |
{ "name": "Partitioning", "value": "RANDOM_PARTITIONED" }, | |
{ "name": "Partitioning Order", "value": "(none)" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"local_properties": [ | |
{ "name": "Order", "value": "(none)" }, | |
{ "name": "Grouping", "value": "not grouped" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"estimates": [ | |
{ "name": "Est. Output Size", "value": "(unknown)" }, | |
{ "name": "Est. Cardinality", "value": "(unknown)" } ], | |
"costs": [ | |
{ "name": "Network", "value": "0.0" }, | |
{ "name": "Disk I/O", "value": "0.0" }, | |
{ "name": "CPU", "value": "0.0" }, | |
{ "name": "Cumulative Network", "value": "0.0" }, | |
{ "name": "Cumulative Disk I/O", "value": "0.0" }, | |
{ "name": "Cumulative CPU", "value": "0.0" } | |
], | |
"compiler_hints": [ | |
{ "name": "Output Size (bytes)", "value": "(none)" }, | |
{ "name": "Output Cardinality", "value": "(none)" }, | |
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, | |
{ "name": "Filter Factor", "value": "(none)" } ] | |
}, | |
{ | |
"id": 1, | |
"type": "pact", | |
"pact": "GroupReduce", | |
"contents": "SUM(3)", | |
"parallelism": "24", | |
"predecessors": [ | |
{"id": 2, "ship_strategy": "Hash Partition on [0, 1, 2]", "local_strategy": "Sort (combining) on [0:ASC,1:ASC,2:ASC]", "exchange_mode": "PIPELINED"} | |
], | |
"driver_strategy": "Sorted Group Reduce", | |
"global_properties": [ | |
{ "name": "Partitioning", "value": "HASH_PARTITIONED" }, | |
{ "name": "Partitioned on", "value": "[0, 1, 2]" }, | |
{ "name": "Partitioning Order", "value": "(none)" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"local_properties": [ | |
{ "name": "Order", "value": "[0:ASC,1:ASC,2:ASC]" }, | |
{ "name": "Grouped on", "value": "[0, 1, 2]" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"estimates": [ | |
{ "name": "Est. Output Size", "value": "(unknown)" }, | |
{ "name": "Est. Cardinality", "value": "(unknown)" } ], | |
"costs": [ | |
{ "name": "Network", "value": "(unknown)" }, | |
{ "name": "Disk I/O", "value": "(unknown)" }, | |
{ "name": "CPU", "value": "(unknown)" }, | |
{ "name": "Cumulative Network", "value": "(unknown)" }, | |
{ "name": "Cumulative Disk I/O", "value": "(unknown)" }, | |
{ "name": "Cumulative CPU", "value": "(unknown)" } | |
], | |
"compiler_hints": [ | |
{ "name": "Output Size (bytes)", "value": "(none)" }, | |
{ "name": "Output Cardinality", "value": "(none)" }, | |
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, | |
{ "name": "Filter Factor", "value": "(none)" } ] | |
}, | |
{ | |
"id": 0, | |
"type": "sink", | |
"pact": "Data Sink", | |
"contents": "hello.flink.scala.Main$$anon$3@226a82c4", | |
"parallelism": "24", | |
"predecessors": [ | |
{"id": 1, "ship_strategy": "Forward", "exchange_mode": "PIPELINED"} | |
], | |
"global_properties": [ | |
{ "name": "Partitioning", "value": "HASH_PARTITIONED" }, | |
{ "name": "Partitioned on", "value": "[0, 1, 2]" }, | |
{ "name": "Partitioning Order", "value": "(none)" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"local_properties": [ | |
{ "name": "Order", "value": "[0:ASC,1:ASC,2:ASC]" }, | |
{ "name": "Grouped on", "value": "[0, 1, 2]" }, | |
{ "name": "Uniqueness", "value": "not unique" } | |
], | |
"estimates": [ | |
{ "name": "Est. Output Size", "value": "(unknown)" }, | |
{ "name": "Est. Cardinality", "value": "(unknown)" } ], | |
"costs": [ | |
{ "name": "Network", "value": "0.0" }, | |
{ "name": "Disk I/O", "value": "0.0" }, | |
{ "name": "CPU", "value": "0.0" }, | |
{ "name": "Cumulative Network", "value": "(unknown)" }, | |
{ "name": "Cumulative Disk I/O", "value": "(unknown)" }, | |
{ "name": "Cumulative CPU", "value": "(unknown)" } | |
], | |
"compiler_hints": [ | |
{ "name": "Output Size (bytes)", "value": "(none)" }, | |
{ "name": "Output Cardinality", "value": "(none)" }, | |
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, | |
{ "name": "Filter Factor", "value": "(none)" } ] | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def getInputFormat(query: String, dbUrl: String, properties: Properties): JDBCInputFormat = { | |
val fieldTypes = Seq( | |
BasicTypeInfo.STRING_TYPE_INFO, | |
SqlTimeTypeInfo.TIMESTAMP, | |
BasicTypeInfo.LONG_TYPE_INFO | |
) | |
val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) | |
val inputFormat = JDBCInputFormat.buildJDBCInputFormat() | |
.setDrivername("com.vertica.jdbc.Driver") | |
.setDBUrl(dbUrl) | |
.setUsername(properties.getProperty("user")) | |
.setPassword(properties.getProperty("password")) | |
.setQuery(query) | |
.setRowTypeInfo(rowTypeInfo) | |
.setParametersProvider(getParameterValuesProvider) | |
.finish() | |
val javaEnv = org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment | |
val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo, "example") | |
val sdp = dataSource.getSplitDataProperties | |
sdp.splitsPartitionedBy(0) | |
sdp.splitsOrderedBy(Array(1), Array(Order.ASCENDING)) | |
inputFormat | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val query = "SELECT DISTINCT\n" + | |
" hostname,\n" + | |
s" TIME_SLICE(timestamp, $windowSize)::TIMESTAMPTZ AS ts,\n" + | |
" id\n" + | |
"FROM\n" + | |
" my_table\n" + | |
"WHERE\n" + | |
s" timestamp BETWEEN $dateBetween\n" + | |
" AND\n" + | |
" hostname = ?\n" + | |
"ORDER BY\n" + | |
" ts ASC\n" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment