Skip to content

Instantly share code, notes, and snippets.

@YuvalItzchakov
Created November 4, 2021 07:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b to your computer and use it in GitHub Desktop.
Save YuvalItzchakov/9441a4a0e80609e534e69804e94cb57b to your computer and use it in GitHub Desktop.
def getPartitionStep(parallelism: Int, numberOfPartitions: Int) = parallelism.toDouble / numberOfPartitions
def getEffectivePartitionKeys(maxParallelism: Int, parallelism: Int, numberOfPartitions: Int): Map[Int, Int] = {
val step = getPartitionStep(parallelism, numberOfPartitions)
@tailrec
def findPartitionEffectiveKey(partition: Int, key: Int): Int = {
val keyRange: KeyGroupRange =
KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
maxParallelism,
parallelism,
(partition * step).toInt
)
val hash = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(key.hashCode(), maxParallelism)
if (keyRange.contains(hash)) {
logger.info(
s"Batch partition: $partition with effective key: $key is assigned to key group range from: ${keyRange.getStartKeyGroup} to ${keyRange.getEndKeyGroup}"
)
key
} else {
findPartitionEffectiveKey(partition, key + 1)
}
}
case class EffectivePartitionKey(partition: Int, effectiveKey: Int)
List
.range(0, numberOfPartitions)
.foldLeft(List.empty[EffectivePartitionKey]) { case (effectivePartitionKeys, partition) =>
EffectivePartitionKey(
partition,
findPartitionEffectiveKey(partition, effectivePartitionKeys.headOption.map(_.effectiveKey + 1).getOrElse(0))
) :: effectivePartitionKeys
}
.map { case EffectivePartitionKey(partition, effectiveValue) => (partition, effectiveValue) }
.toMap
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment