Skip to content

Instantly share code, notes, and snippets.

@chiwanpark
Last active August 29, 2015 14:24
Show Gist options
  • Save chiwanpark/5e2a6ac00b7e0bf85444 to your computer and use it in GitHub Desktop.
Save chiwanpark/5e2a6ac00b7e0bf85444 to your computer and use it in GitHub Desktop.
Flink Scala ANY, ALL Example
import org.apache.flink.api.common.functions.RichFilterFunction
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConversions._
case class PC(var model: String, var price: Double)
case class Laptop(var model: String, var price: Double)
case class Product(var maker: String, var model: String, var price: Double)
object TestSample extends App {
val env = ExecutionEnvironment.getExecutionEnvironment
// sample data for PC
val pcs = env.fromElements(PC("A", 0.5), PC("B", 1.0), PC("C", 1.5), PC("D", 2.0))
// sample data for Laptop
val laptops = env.fromElements(Laptop("A", 0.5), Laptop("Q", 5.0))
// sample data for Product
val products = env.fromElements(Product("MAKER1", "A", 0.5), Product("MAKER2", "A", 0.2), Product("MAKER3", "Q", 0.5))
val pcPrices = pcs.map(_.price) // Subquery 1 -> SELECT price FROM PC
val pcModels = pcs.map(_.model) // Subquery 2 -> SELECT model FROM PC
// Query 1 -> SELECT model, price FROM laptop WHERE price > ALL ~~~
val allFiltered = laptops.filter(new RichFilterFunction[Laptop] {
var bcSet: Traversable[Double] = null
override def open(parameters: Configuration): Unit = {
super.open(parameters)
bcSet = getRuntimeContext.getBroadcastVariable[Double]("pcPrices").toList
}
override def filter(value: Laptop): Boolean = bcSet.forall(value.price > _)
}).withBroadcastSet(pcPrices, "pcPrices")
// Query 2 -> SELECT DISTINCT maker FROM Product WHERE model > ANY ~~~
val anyFiltered = products.filter(new RichFilterFunction[Product] {
var bcSet: Traversable[String] = null
override def open(parameters: Configuration): Unit = {
super.open(parameters)
bcSet = getRuntimeContext.getBroadcastVariable[String]("pcModels").toList
}
override def filter(value: Product): Boolean = !bcSet.forall(value.model <= _)
}).withBroadcastSet(pcModels, "pcModels").distinct("maker").map(_.maker)
allFiltered.print()
anyFiltered.print()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment