Skip to content

Instantly share code, notes, and snippets.

@andrewpalumbo
Last active April 25, 2016 17:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3 to your computer and use it in GitHub Desktop.
Save andrewpalumbo/1768dac6d2f5fdf963abacabd859aaf3 to your computer and use it in GitHub Desktop.
Problem when empty partitions are encountered.
Matrix A:{
0 => {0:1.0,1:2.0}
1 => {0:3.0,1:4.0}
}
Matrix B:{
0 => {0:3.0,1:4.0}
1 => {0:5.0,1:6.0}
}
val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
(Int, Array[K1], Matrix)] {
// partition number
var part: Int = 0
// get the index of the partition
override def open(params: Configuration): Unit = {
part = getRuntimeContext.getIndexOfThisSubtask
}
// bind the partition number to each keySet/block
def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
val blockIter = values.iterator()
if (blockIter.hasNext()) {
val r = part -> blockIter.next
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
out.collect((r._1, r._2._1, r._2._2))
}
}
})
val blocksBKeyed = blocksB.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
(Int, Array[K2], Matrix)] {
// partition number
var part: Int = 0
// get the index of the partition
override def open(params: Configuration): Unit = {
part = getRuntimeContext.getIndexOfThisSubtask
}
// bind the partition number to each keySet/block
def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
val blockIter = values.iterator()
if (blockIter.hasNext()) {
val r = part -> blockIter.next
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
out.collect((r._1, r._2._1, r._2._2))
}
}
})
Partition # bound to blocks of Matrix B (partitions 0,1/4)
0 -> {
0 => {0:5.0,1:6.0}
}
1 -> {
0 => {0:3.0,1:4.0}
}
Note that partition 2 has been moved into partition 0, and partitions are now out of order.
Partition # bound to blocks (1x2 matrices) of Matrix A (partitions 0,1/4)
0 -> {
0 => {0:1.0,1:2.0}
}
1 -> {
0 => {0:3.0,1:4.0}
}
Partition # bound to blocks (1x2 matrices) of Matrix B (partitions 1,2/4)
1 -> {
0 => {0:3.0,1:4.0}
}
2 -> {
0 => {0:5.0,1:6.0}
}
val blocksAKeyed = blocksA.mapPartition( new RichMapPartitionFunction[(Array[K1], Matrix),
(Int, Array[K1], Matrix)] {
// partition number
var part: Int = 0
// get the index of the partition
override def open(params: Configuration): Unit = {
part = getRuntimeContext.getIndexOfThisSubtask
}
// bind the partition number to each keySet/block
def mapPartition(values: java.lang.Iterable[(Array[K1], Matrix)], out: Collector[(Int, Array[K1], Matrix)]): Unit = {
val blockIter = values.iterator()
if (blockIter.hasNext()) {
val r = part -> blockIter.next
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
out.collect((r._1, r._2._1, r._2._2))
}
}
})
// calcuate actual number of non empty partitions used by blocksA
// we'll need this to key blocksB with the correct partition numbers
// to join upon. blocksA may use partitions 0,1 and blocksB may use partitions 2,3.
val aNonEmptyParts = blocksA.map(new MapFunction[(Array[K1], Matrix), Int] {
def map(a: (Array[K1], Matrix)): Int = {
if (a._1.length > 0) {
1
} else {
0
}
}
}).reduce(new ReduceFunction[Int] {
def reduce(a: Int, b: Int): Int = a + b
}).collect().head
implicit val typeInformationB = createTypeInformation[(Int, (Array[K2], Matrix))]
val blocksBKeyed = blocksB
// repartition B Blocks into the number of empty partitions used by A
.setParallelism(aNonEmptyParts)
// map and assign 0-based ordinals to each matrix block.
.mapPartition( new RichMapPartitionFunction[(Array[K2], Matrix),
(Int, Array[K2], Matrix)] {
// partition number
var part: Int = 0
// get the index of the partition- this should be in [0, degree of parallelism)
override def open(params: Configuration): Unit = {
part = getRuntimeContext.getIndexOfThisSubtask
}
// bind the partition number to each keySet/block
def mapPartition(values: java.lang.Iterable[(Array[K2], Matrix)], out: Collector[(Int, Array[K2], Matrix)]): Unit = {
val blockIter = values.iterator()
if (blockIter.hasNext()) {
val r = part -> blockIter.next
require(!blockIter.hasNext, s"more than 1 (${blockIter.asScala.size + 1}) blocks per partition and A of AB'")
out.collect((r._1, r._2._1, r._2._2))
}
}
})
@andrewpalumbo
Copy link
Author

andrewpalumbo commented Apr 24, 2016

Running into an issue when the number of elements of a dataset are less than the global degree of parallelism. In the above example, two 2x2 matrices are blockified in a DataSet. In this case this means that each (1x2 matrix) "row" goes into a partition. In Matrix A, the (1x2 matrix) "rows" go into partitions 0,1. However the (1x2 matrix) "rows" of Matrix B end up in partitions 1,2. I need to assign the ordinal index of the blockified matrix's partition to its block, and then to join on that index. However in this case with empty partitions, the intersection of the blockified matrices indices is 1, and partitions 0 and 2 get lost.

I've tried explicitly defining the dop for Matrix B using the count of non-empty partitions in Matrix A, however this changes the order of the DataSet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment