Skip to content

Instantly share code, notes, and snippets.

@andrewpalumbo
Last active April 21, 2016 01:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewpalumbo/d8015b8fe0ea30c6c4cc08ff9b819574 to your computer and use it in GitHub Desktop.
Save andrewpalumbo/d8015b8fe0ea30c6c4cc08ff9b819574 to your computer and use it in GitHub Desktop.
dals failure in FlinkDistributedTestSuite
Process finished with exit code 137
Caused by: java.lang.Exception: Deserializing the InputFormat ([(0,{0:0.3947476722883563,1:-0.08695028358267716,2:-1.0574297632219802,3:0.3268090996516988,4:-1.3667553319818917,5:-0.1794776700908003,6:1.078276508767426,7:-1.19520500669697,8:-0.48920817822415197,9:-0.01611590341576673,10:-0.3924584320254835,11:1.1084504280408736,12:-0.7766818602582699,13:-1.745148020967139,14:-0.30702403178017207,15:1.0870667203881104,16:0.5743916990799559,17:1.1374342122090273,18:-1.0523085600170734,19:-1.3638541557908512,20:-1.3315774874522164,21:0.13871074941128161,22:-0.1 ...
def dals[K](
drmA: DrmLike[K],
k: Int = 50,
lambda: Double = 0.0,
maxIterations: Int = 10,
convergenceThreshold: Double = 0.10
): Result[K] = {
assert(convergenceThreshold < 1.0, "convergenceThreshold")
assert(maxIterations >= 1, "maxIterations")
// Some mapblock() usage may require to know ClassTag[K] bound
implicit val ktag = drmA.keyClassTag
val drmAt = drmA.t
// Initialize U and V so that they are identically distributed to A or A'
var drmU = drmA.mapBlock(ncol = k) {
case (keys, block) =>
val rnd = RandomUtils.getRandom()
val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01
keys -> uBlock
}
var drmV: DrmLike[Int] = null
var rmseIterations: List[Double] = Nil
// ALS iterator
var stop = false
var i = 0
while (!stop && i < maxIterations) {
// Alternate. This is really what ALS is.
if (drmV != null) drmV.uncache()
drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint()
drmU.uncache()
drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint()
// Check if we are requested to do a convergence test; and do it if yes.
if (convergenceThreshold > 0) {
val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow)
if (i > 0) {
val rmsePrev = rmseIterations.last
val convergence = (rmsePrev - rmse) / rmsePrev
if (convergence < 0) {
log.warn("Rmse increase of %f. Should not happen.".format(convergence))
// I guess error growth can happen in ideal data case?
stop = true
} else if (convergence < convergenceThreshold) {
stop = true
}
}
rmseIterations :+= rmse
}
i += 1
}
new Result(drmU, drmV, rmseIterations)
}
test("dals") {
val rnd = RandomUtils.getRandom
// Number of points
val m = 500
val n = 500
// Length of actual spectrum
val spectrumLen = 40
// Create singluar values with decay
val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3))
printf("spectrum:%s\n", spectrum)
// Create A as an ideal input
val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*%
qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t
val drmA = drmParallelize(inCoreA, numPartitions = 2)
// Decompose using ALS
val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple
val inCoreU = drmU.collect
val inCoreV = drmV.collect
val predict = inCoreU %*% inCoreV.t
printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3))
printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3))
val err = (inCoreA - predict).norm
printf("norm of residuals %f\n", err)
printf("train iteration rmses: %s\n", rmse)
err should be < 15e-2
}
java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:248)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:84)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
04/20/2016 20:58:38 Job execution switched to status FAILING.
java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:248)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:84)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
04/20/2016 20:58:38 RangePartition: LocalSample(1/1) switched to CANCELED
04/20/2016 20:58:38 RangePartition: GlobalSample(1/1) switched to CANCELED
04/20/2016 20:58:38 RangePartition: Histogram(1/1) switched to CANCELED
04/20/2016 20:58:38 RangePartition: PreparePartition(1/1) switched to CANCELED
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(1/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(2/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(3/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(4/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(1/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(2/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(3/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(4/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(1/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(2/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(3/4) switched to CANCELED
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(4/4) switched to CANCELED
04/20/2016 20:58:38 Reduce (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(1/1) switched to CANCELED
04/20/2016 20:58:38 DataSink (org.apache.flink.api.java.Utils$CollectHelper@17c4dc5b)(1/1) switched to CANCELED
04/20/2016 20:58:38 Job execution switched to status FAILED.
Job execution failed.
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:248)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:84)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
@andrewpalumbo
Copy link
Author

ALS is an iteratative algorithm but note that at each iteration, we call DRM.checkpoint() which will trigger a Flink execution of the plan and return a CheckpointedDrmFlink.

We get java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] and other errors as shown above. for matricies > ~350 x 350 when the scalatests are given 4g of memory, get the While tweaking some of the AKKA timeout vars we saw the following serialiazation error (as shown at the top of this gist):

Process finished with exit code 137 Caused by: java.lang.Exception: Deserializing the InputFormat ([(0,{0:0.3947476722883563,1:-0.08695028358267716,2:-1.0574297632219802,3:0.3268090996516988,4:-1.3667553319818917,5:-0.1794776700908003,6:1.078276508767426,7:-1.19520500669697,8:-0.48920817822415197,9:-0.01611590341576673,10:-0.3924584320254835,11:1.1084504280408736,12:-0.7766818602582699,13:-1.745148020967139,14:-0.30702403178017207,15:1.0870667203881104,16:0.5743916990799559,17:1.1374342122090273,18:-1.0523085600170734,19:-1.3638541557908512,20:-1.3315774874522164,21:0.13871074941128161,22:-0.1 ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment