Last active
April 25, 2016 17:57
-
-
Save andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3 to your computer and use it in GitHub Desktop.
Problem when empty partitions are encountered.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Matrix A:{ | |
0 => {0:1.0,1:2.0} | |
1 => {0:3.0,1:4.0} | |
} | |
Matrix B:{ | |
0 => {0:3.0,1:4.0} | |
1 => {0:5.0,1:6.0} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix), | |
(Int, Array[K1], Matrix)] { | |
// partition number | |
var part: Int = 0 | |
// get the index of the partition | |
override def open(params: Configuration): Unit = { | |
part = getRuntimeContext.getIndexOfThisSubtask | |
} | |
// bind the partition number to each keySet/block | |
def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = { | |
val blockIter = values.iterator() | |
if (blockIter.hasNext()) { | |
val r = part -> blockIter.next | |
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'") | |
out.collect((r._1, r._2._1, r._2._2)) | |
} | |
} | |
}) | |
val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix), | |
(Int, Array[K2], Matrix)] { | |
// partition number | |
var part: Int = 0 | |
// get the index of the partition | |
override def open(params: Configuration): Unit = { | |
part = getRuntimeContext.getIndexOfThisSubtask | |
} | |
// bind the partition number to each keySet/block | |
def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = { | |
val blockIter = values.iterator() | |
if (blockIter.hasNext()) { | |
val r = part -> blockIter.next | |
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'") | |
out.collect((r._1, r._2._1, r._2._2)) | |
} | |
} | |
}) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Partition # bound to blocks of Matrix B (partitions 0,1/4) | |
0 -> { | |
0 => {0:5.0,1:6.0} | |
} | |
1 -> { | |
0 => {0:3.0,1:4.0} | |
} | |
Note that partition 2 has been moved into partition 0, and partitions are now out of order. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Partition # bound to blocks (1x2 matrices) of Matrix A (partitions 0,1/4) | |
0 -> { | |
0 => {0:1.0,1:2.0} | |
} | |
1 -> { | |
0 => {0:3.0,1:4.0} | |
} | |
Partition # bound to blocks (1x2 matrices) of Matrix B (partitions 1,2/4) | |
1 -> { | |
0 => {0:3.0,1:4.0} | |
} | |
2 -> { | |
0 => {0:5.0,1:6.0} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix), | |
(Int, Array[K1], Matrix)] { | |
// partition number | |
var part: Int = 0 | |
// get the index of the partition | |
override def open(params: Configuration): Unit = { | |
part = getRuntimeContext.getIndexOfThisSubtask | |
} | |
// bind the partition number to each keySet/block | |
def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = { | |
val blockIter = values.iterator() | |
if (blockIter.hasNext()) { | |
val r = part -> blockIter.next | |
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'") | |
out.collect((r._1, r._2._1, r._2._2)) | |
} | |
} | |
}) | |
// calcuate actual number of non empty partitions used by blocksA | |
// we'll need this to key blocksB with the correct partition numbers | |
// to join upon. blocksA may use partitions 0,1 and blocksB may use partitions 2,3. | |
val aNonEmptyParts = blocksA.map(new MapFunction[(Array[K1], Matrix), Int] { | |
def map(a: (Array[K1], Matrix)): Int = { | |
if (a._1.length > 0) { | |
1 | |
} else { | |
0 | |
} | |
} | |
}).reduce(new ReduceFunction[Int] { | |
def reduce(a: Int, b: Int): Int = a + b | |
}).collect().head | |
implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))] | |
val blocksBKeyed = blocksB | |
// repartition B Blocks into the number of empty partitions used by A | |
.setParallelism(aNonEmptyParts) | |
// map and assign 0-based ordinals to each matrix block. | |
.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix), | |
(Int, Array[K2], Matrix)] { | |
// partition number | |
var part: Int = 0 | |
// get the index of the partition- this should be in [0, degree of parallelism) | |
override def open(params: Configuration): Unit = { | |
part = getRuntimeContext.getIndexOfThisSubtask | |
} | |
// bind the partition number to each keySet/block | |
def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = { | |
val blockIter = values.iterator() | |
if (blockIter.hasNext()) { | |
val r = part -> blockIter.next | |
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'") | |
out.collect((r._1, r._2._1, r._2._2)) | |
} | |
} | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Running into an issue when the number of elements of a dataset are less than the global degree of parallelism. In the above example, two 2x2 matrices are blockified in a
DataSet
. In this case this means that each (1x2 matrix) "row" goes into a partition. In Matrix A, the (1x2 matrix) "rows" go into partitions 0,1. However the (1x2 matrix) "rows" of Matrix B end up in partitions 1,2. I need to assign the ordinal index of the blockified matrix's partition to its block, and then to join on that index. However in this case with empty partitions, the intersection of the blockified matrices indices is 1, and partitions 0 and 2 get lost.I've tried explicitly defining the dop for Matrix B using the count of non-empty partitions in Matrix A, however this changes the order of the
DataSet
.