Skip to content

Instantly share code, notes, and snippets.

@docete
Last active August 14, 2019 07:34
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save docete/8e78ff8b5d0df69f60dda547780101f1 to your computer and use it in GitHub Desktop.
Save docete/8e78ff8b5d0df69f60dda547780101f1 to your computer and use it in GitHub Desktop.
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.examples.scala
import java.sql.Timestamp
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import scala.collection.mutable
object TimeWindowedJoin {
// *************************************************************************
// PROGRAM
// *************************************************************************
def main(args: Array[String]): Unit = {
// Checking input parameters
val params = ParameterTool.fromArgs(args)
val planner =
if (params.has("planner")) {
params.get("planner")
} else {
println("Execute TimeWindowedJoin with defalut old planner.")
println("Use --planner to specify planner (old/Blink).")
"old"
}
val builder = EnvironmentSettings.newInstance
builder.inStreamingMode
if (planner == "old") {
builder.useOldPlanner
} else if (planner == "blink") {
builder.useBlinkPlanner
}
val settings = builder.build()
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env, settings)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// prepare orders and payments
val ordersData = new mutable.MutableList[(String, String, Timestamp)]
ordersData += (("001", "iphone", new Timestamp(1545800002000L)))
ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
ordersData.+=(("004", "cpu", new Timestamp(1545800018000L)))
val paymentsData = new mutable.MutableList[(String, String, Timestamp)]
paymentsData.+=(("001", "alipay", new Timestamp(1545803501000L)))
paymentsData.+=(("002", "card", new Timestamp(1545803602000L)))
paymentsData.+=(("003", "card", new Timestamp(1545803610000L)))
paymentsData.+=(("004", "alipay", new Timestamp(1545803611000L)))
val orders = env
.fromCollection(ordersData)
.assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
.toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
val payments = env
.fromCollection(paymentsData)
.assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
.toTable(tEnv, 'orderId, 'paymentType, 'paymentTime.rowtime)
tEnv.registerTable("Orders", orders)
tEnv.registerTable("Payments", payments)
val sqlQuery =
"""
|SELECT
| o.orderId,
| o.productName,
| p.paymentType,
| CAST(o.orderTime AS TIMESTAMP),
| CAST(p.paymentTime AS TIMESTAMP)
|FROM
| Orders AS o,
| Payments AS p
|WHERE
| o.orderId = p.orderId AND
| p.paymentTime BETWEEN o.orderTime AND o.orderTime + INTERVAL '1' HOUR
""".stripMargin
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row].print()
env.execute("TimeWindowedJoin")
}
class TimestampExtractor[T1, T2]
extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
/**
* Extracts the timestamp from the given element.
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
override def extractTimestamp(element: (T1, T2, Timestamp)): Long = element._3.getTime
}
}
@docete
Copy link
Author

docete commented Aug 13, 2019

Time Windowed Join Demo with flink 1.10 (6a63b34463ce9fd1f66b0a26a599423d6500ee17)

@docete
Copy link
Author

docete commented Aug 13, 2019

Results should be(ordering is non-deterministic):
2> 002,mac,card,2018-12-26T04:53:23,2018-12-26T05:53:22
9> 001,iphone,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41
11> 004,cpu,alipay,2018-12-26T04:53:38,2018-12-26T05:53:31

@docete
Copy link
Author

docete commented Aug 13, 2019

dependencies:

	<!-- Table ecosystem -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
		<version>${project.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
		<version>${project.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
		<version>${project.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
		<version>${project.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
		<version>${project.version}</version>
	</dependency>
</dependencies>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment