Skip to content

Instantly share code, notes, and snippets.

@myui
Last active September 26, 2017 12:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save myui/549120f8c857d1245e60 to your computer and use it in GitHub Desktop.
Save myui/549120f8c857d1245e60 to your computer and use it in GitHub Desktop.
Classification of news20.binary dataset by LogisticRegressionWithSGD (Spark 1.0 MLlib)

The dataset used in the Evaluation

http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' > news20.binary.1000
sort -R news20.binary > news20.random
head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' > news20.random.1000

Evaluated code

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

//val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.binary.1000",  multiclass=false)
val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.random.1000",  multiclass=false)
// val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.random.1000",  multiclass=false, numFeatures = 1354731 , minPartitions = 32)

val numFeatures = training .take(1)(0).features.size
//numFeatures: Int = 178560 for news20.binary.1000
//numFeatures: Int = 1354731 for news20.random.1000
val model = LogisticRegressionWithSGD.train(training, numIterations=1)
@myui
Copy link
Author

myui commented Jun 17, 2014

I have evaluated LogisticRegressionWithSGD of Spark 1.0 MLlib on Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though the number of training examples used in the evaluation is just 1,000.

It works fine for the dataset news20.binary.1000 that has 178,560 features. However, it does not work for news20.random.1000 where # of features is large (1,354,731 features) though we used a sparse vector through MLUtils.loadLibSVMFile().

The execution seems not progressing while no error is reported in the spark-shell as well as in the stdout/stderr of executors.

We used 32 executors with each allocating 7GB (2GB is for RDD) for working memory.

@myui
Copy link
Author

myui commented Jun 17, 2014

aggregate at GradientDescent.scala:178 never finishes.
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

We confirmed that GC is not happening during the aggregate.

@myui
Copy link
Author

myui commented Jun 17, 2014

aggregate at LBFGS.scala:201 also never finishes.
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization._

val data = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.random.1000", multiclass=false)
val numFeatures = data.take(1)(0).features.size

val training = data.map(x => (x.label, MLUtils.appendBias(x.features))).cache()

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

@myui
Copy link
Author

myui commented Jun 17, 2014

Posted a question to spark-users.
http://markmail.org/message/tur2d2yu75pk5ay2

@myui
Copy link
Author

myui commented Jun 18, 2014

Dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+(30M+?).

TreeReduce is to be introduced(?) to cope with it.

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

http://markmail.org/message/tur2d2yu75pk5ay2#query:+page:1+mid:2eimvfcsxtfmi6cl+state:results

@myui
Copy link
Author

myui commented Jun 19, 2014

I could figure out what the problem is. "spark.akka.frameSize" was too large.
By setting spark.akka.frameSize=10, it worked for news20 dataset.

However, the execution is slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in 33.6GB) due to the sequential aggregate of dense vectors on a single driver node.

Took 7.6m for aggregation for an iteration on 33 nodes.

@myui
Copy link
Author

myui commented Jun 19, 2014

@myui
Copy link
Author

myui commented Jul 12, 2014

14/07/13 14:23:36 INFO scheduler.TaskSetManager: Serialized task 4.0:265 as 25300257 bytes in 814 ms
14/07/13 14:23:36 INFO scheduler.TaskSetManager: Starting task 4.0:205 as TID 1068 on executor 31: dc10.dbgrid.org (NODE_LOCAL)
768.026: [Full GC [PSYoungGen: 447034K->345941K(932096K)] [ParOldGen: 2784818K->2784818K(2796224K)] 3231853K->3130759K(3728320K) [PS
PermGen: 71339K->71339K(71936K)], 0.7526850 secs] [Times: user=8.62 sys=0.00, real=0.76 secs]
14/07/13 14:23:37 INFO scheduler.TaskSetManager: Serialized task 4.0:205 as 25300257 bytes in 790 ms
14/07/13 14:23:37 INFO scheduler.TaskSetManager: Starting task 4.0:190 as TID 1069 on executor 9: dc18.dbgrid.org (NODE_LOCAL)
768.805: [Full GC [PSYoungGen: 433599K->365016K(932096K)] [ParOldGen: 2784818K->2784817K(2796224K)] 3218417K->3149834K(3728320K) [PS
PermGen: 71339K->71339K(71936K)], 0.6894240 secs] [Times: user=7.82 sys=0.00, real=0.69 secs]
769.519: [Full GC [PSYoungGen: 452911K->370646K(932096K)] [ParOldGen: 2784817K->2784815K(2796224K)] 3237728K->3155462K(3728320K) [PSPermGen: 71339K->71339K(71936K)], 0.8372720 secs] [Times: user=9.11 sys=0.00, real=0.83 secs]
14/07/13 14:23:38 INFO scheduler.TaskSetManager: Serialized task 4.0:190 as 25300257 bytes in 1569 ms
14/07/13 14:23:38 INFO scheduler.TaskSetManager: Starting task 4.0:218 as TID 1070 on executor 21: dc12.dbgrid.org (NODE_LOCAL)
770.374: [Full GC [PSYoungGen: 433610K->389725K(932096K)] [ParOldGen: 2784815K->2784815K(2796224K)] 3218425K->3174540K(3728320K) [PSPermGen: 71339K->71339K(71936K)], 0.7696960 secs] [Times: user=8.14 sys=0.00, real=0.77 secs]
14/07/13 14:23:39 WARN storage.BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#292206058]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:236)
        at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:51)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:113)
        at org.apache.spark.storage.BlockManager$$anonfun$initialize$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(BlockManager.scala:158)
        at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:790)
        at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:158)
        at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
        at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
771.159: [Full GC [PSYoungGen: 452738K->395364K(932096K)] [ParOldGen: 2784815K->2784815K(2796224K)] 3237553K->3180180K(3728320K) [PSPermGen: 71342K->71342K(71936K)], 0.8778540 secs] [Times: user=9.45 sys=0.00, real=0.87 secs]
..
14/07/13 14:23:44 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1833)
        at java.io.ObjectOutputStream.write(ObjectOutputStream.java:686)
        at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:128)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1443)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1414)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
        at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:132)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:415)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:242)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:238)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:235)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:235)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:235)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:235)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:130)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:102)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
14/07/13 14:23:44 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
14/07/13 14:23:44 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
14/07/13 14:23:44 INFO Remoting: Remoting shut down
14/07/13 14:23:44 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
14/07/13 14:23:45 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@553d70ce
14/07/13 14:23:45 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc08.dbgrid.org,45750)
14/07/13 14:23:45 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc08.dbgrid.org,45750)
14/07/13 14:23:45 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc08.dbgrid.org,45750)
14/07/13 14:23:45 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@553d70ce
java.nio.channels.CancelledKeyException
        at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:362)
        at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment