Last active
April 21, 2016 01:02
-
-
Save andrewpalumbo/d8015b8fe0ea30c6c4cc08ff9b819574 to your computer and use it in GitHub Desktop.
dals failure in FlinkDistributedTestSuite
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Process finished with exit code 137 | |
Caused by: java.lang.Exception: Deserializing the InputFormat ([(0,{0:0.3947476722883563,1:-0.08695028358267716,2:-1.0574297632219802,3:0.3268090996516988,4:-1.3667553319818917,5:-0.1794776700908003,6:1.078276508767426,7:-1.19520500669697,8:-0.48920817822415197,9:-0.01611590341576673,10:-0.3924584320254835,11:1.1084504280408736,12:-0.7766818602582699,13:-1.745148020967139,14:-0.30702403178017207,15:1.0870667203881104,16:0.5743916990799559,17:1.1374342122090273,18:-1.0523085600170734,19:-1.3638541557908512,20:-1.3315774874522164,21:0.13871074941128161,22:-0.1 ... |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def dals[K]( | |
drmA: DrmLike[K], | |
k: Int = 50, | |
lambda: Double = 0.0, | |
maxIterations: Int = 10, | |
convergenceThreshold: Double = 0.10 | |
): Result[K] = { | |
assert(convergenceThreshold < 1.0, "convergenceThreshold") | |
assert(maxIterations >= 1, "maxIterations") | |
// Some mapblock() usage may require to know ClassTag[K] bound | |
implicit val ktag = drmA.keyClassTag | |
val drmAt = drmA.t | |
// Initialize U and V so that they are identically distributed to A or A' | |
var drmU = drmA.mapBlock(ncol = k) { | |
case (keys, block) => | |
val rnd = RandomUtils.getRandom() | |
val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01 | |
keys -> uBlock | |
} | |
var drmV: DrmLike[Int] = null | |
var rmseIterations: List[Double] = Nil | |
// ALS iterator | |
var stop = false | |
var i = 0 | |
while (!stop && i < maxIterations) { | |
// Alternate. This is really what ALS is. | |
if (drmV != null) drmV.uncache() | |
drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint() | |
drmU.uncache() | |
drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint() | |
// Check if we are requested to do a convergence test; and do it if yes. | |
if (convergenceThreshold > 0) { | |
val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow) | |
if (i > 0) { | |
val rmsePrev = rmseIterations.last | |
val convergence = (rmsePrev - rmse) / rmsePrev | |
if (convergence < 0) { | |
log.warn("Rmse increase of %f. Should not happen.".format(convergence)) | |
// I guess error growth can happen in ideal data case? | |
stop = true | |
} else if (convergence < convergenceThreshold) { | |
stop = true | |
} | |
} | |
rmseIterations :+= rmse | |
} | |
i += 1 | |
} | |
new Result(drmU, drmV, rmseIterations) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
test("dals") { | |
val rnd = RandomUtils.getRandom | |
// Number of points | |
val m = 500 | |
val n = 500 | |
// Length of actual spectrum | |
val spectrumLen = 40 | |
// Create singluar values with decay | |
val spectrum = dvec((0 until spectrumLen).map(x => 300.0 * exp(-x) max 1e-3)) | |
printf("spectrum:%s\n", spectrum) | |
// Create A as an ideal input | |
val inCoreA = (qr(Matrices.symmetricUniformView(m, spectrumLen, 1234))._1 %*%: diagv(spectrum)) %*% | |
qr(Matrices.symmetricUniformView(n, spectrumLen, 2345))._1.t | |
val drmA = drmParallelize(inCoreA, numPartitions = 2) | |
// Decompose using ALS | |
val (drmU, drmV, rmse) = dals(drmA = drmA, k = 20).toTuple | |
val inCoreU = drmU.collect | |
val inCoreV = drmV.collect | |
val predict = inCoreU %*% inCoreV.t | |
printf("Control block:\n%s\n", inCoreA(0 until 3, 0 until 3)) | |
printf("ALS factorized approximation block:\n%s\n", predict(0 until 3, 0 until 3)) | |
val err = (inCoreA - predict).norm | |
printf("norm of residuals %f\n", err) | |
printf("train iteration rmses: %s\n", rmse) | |
err should be < 15e-2 | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
java.lang.IllegalStateException: unread block data | |
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) | |
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) | |
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) | |
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) | |
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:248) | |
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:84) | |
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) | |
at java.lang.Thread.run(Thread.java:745) | |
04/20/2016 20:58:38 Job execution switched to status FAILING. | |
java.lang.IllegalStateException: unread block data | |
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) | |
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) | |
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) | |
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) | |
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:248) | |
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:84) | |
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) | |
at java.lang.Thread.run(Thread.java:745) | |
04/20/2016 20:58:38 RangePartition: LocalSample(1/1) switched to CANCELED | |
04/20/2016 20:58:38 RangePartition: GlobalSample(1/1) switched to CANCELED | |
04/20/2016 20:58:38 RangePartition: Histogram(1/1) switched to CANCELED | |
04/20/2016 20:58:38 RangePartition: PreparePartition(1/1) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(1/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(2/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(3/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN RangePartition: Partition -> Partition(4/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(1/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(2/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(3/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.drm.RowsFlinkDrm.asBlockified(FlinkDrm.scala:52)) -> Map (Map at org.apache.mahout.flinkbindings.blas.FlinkOpMapBlock$.apply(FlinkOpMapBlock.scala:37)) -> FlatMap (FlatMap at org.apache.mahout.flinkbindings.drm.BlockifiedFlinkDrm.asRowWise(FlinkDrm.scala:93))(4/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(1/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(2/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(3/4) switched to CANCELED | |
04/20/2016 20:58:38 CHAIN MapPartition (MapPartition at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:71)) -> Combine (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(4/4) switched to CANCELED | |
04/20/2016 20:58:38 Reduce (Reduce at org.apache.mahout.flinkbindings.blas.FlinkOpAtA$.slim(FlinkOpAtA.scala:118))(1/1) switched to CANCELED | |
04/20/2016 20:58:38 DataSink (org.apache.flink.api.java.Utils$CollectHelper@17c4dc5b)(1/1) switched to CANCELED | |
04/20/2016 20:58:38 Job execution switched to status FAILED. | |
Job execution failed. | |
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. | |
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:716) | |
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662) | |
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:662) | |
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) | |
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) | |
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) | |
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) | |
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) | |
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) | |
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) | |
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) | |
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) | |
Caused by: java.lang.IllegalStateException: unread block data | |
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383) | |
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) | |
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) | |
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) | |
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) | |
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) | |
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) | |
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) | |
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) | |
at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:248) | |
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:84) | |
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) | |
at java.lang.Thread.run(Thread.java:745) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
ALS is an iteratative algorithm but note that at each iteration, we call
DRM.checkpoint()
which will trigger a Flink execution of the plan and return aCheckpointedDrmFlink
.We get
java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
and other errors as shown above. for matricies > ~350 x 350 when the scalatests are given 4g of memory, get the While tweaking some of the AKKA timeout vars we saw the following serialiazation error (as shown at the top of this gist):Process finished with exit code 137 Caused by: java.lang.Exception: Deserializing the InputFormat ([(0,{0:0.3947476722883563,1:-0.08695028358267716,2:-1.0574297632219802,3:0.3268090996516988,4:-1.3667553319818917,5:-0.1794776700908003,6:1.078276508767426,7:-1.19520500669697,8:-0.48920817822415197,9:-0.01611590341576673,10:-0.3924584320254835,11:1.1084504280408736,12:-0.7766818602582699,13:-1.745148020967139,14:-0.30702403178017207,15:1.0870667203881104,16:0.5743916990799559,17:1.1374342122090273,18:-1.0523085600170734,19:-1.3638541557908512,20:-1.3315774874522164,21:0.13871074941128161,22:-0.1 ...