Skip to content

Instantly share code, notes, and snippets.

def getPartitionStep(parallelism: Int, numberOfPartitions: Int) = parallelism.toDouble / numberOfPartitions
def getEffectivePartitionKeys(maxParallelism: Int, parallelism: Int, numberOfPartitions: Int): Map[Int, Int] = {
val step = getPartitionStep(parallelism, numberOfPartitions)
@tailrec
def findPartitionEffectiveKey(partition: Int, key: Int): Int = {
val keyRange: KeyGroupRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
maxParallelism,
parallelism,
@YuvalItzchakov
YuvalItzchakov / ArrayAgg.scala
Created August 23, 2021 11:37
An attempt to create a generic ARRAY_AGG function for Flink
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.dataview.ListView
import org.apache.flink.table.catalog.DataTypeFactory
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.table.types.inference.{ InputTypeStrategies, TypeInference }
import scala.collection.JavaConverters._
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.reflect.ClassTag