Skip to content

Instantly share code, notes, and snippets.

@myui
Last active September 26, 2017 12:35
Show Gist options
  • Save myui/549120f8c857d1245e60 to your computer and use it in GitHub Desktop.
Save myui/549120f8c857d1245e60 to your computer and use it in GitHub Desktop.
Classification of news20.binary dataset by LogisticRegressionWithSGD (Spark 1.0 MLlib)

The dataset used in the Evaluation

http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' > news20.binary.1000
sort -R news20.binary > news20.random
head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' > news20.random.1000

Evaluated code

import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

//val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.binary.1000",  multiclass=false)
val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.random.1000",  multiclass=false)
// val training = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.random.1000",  multiclass=false, numFeatures = 1354731 , minPartitions = 32)

val numFeatures = training .take(1)(0).features.size
//numFeatures: Int = 178560 for news20.binary.1000
//numFeatures: Int = 1354731 for news20.random.1000
val model = LogisticRegressionWithSGD.train(training, numIterations=1)
@myui
Copy link
Author

myui commented Jun 17, 2014

aggregate at LBFGS.scala:201 also never finishes.
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization._

val data = MLUtils.loadLibSVMFile(sc, "hdfs://dm01:8020/dataset/news20-binary/raw/news20.random.1000", multiclass=false)
val numFeatures = data.take(1)(0).features.size

val training = data.map(x => (x.label, MLUtils.appendBias(x.features))).cache()

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)

@myui
Copy link
Author

myui commented Jun 17, 2014

Posted a question to spark-users.
http://markmail.org/message/tur2d2yu75pk5ay2

@myui
Copy link
Author

myui commented Jun 18, 2014

Dense vectors are used to in aggregation. If you have 32 partitions and each one sending a dense vector of size 1,354,731 to master. Then the driver needs 300M+(30M+?).

TreeReduce is to be introduced(?) to cope with it.

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

http://markmail.org/message/tur2d2yu75pk5ay2#query:+page:1+mid:2eimvfcsxtfmi6cl+state:results

@myui
Copy link
Author

myui commented Jun 19, 2014

I could figure out what the problem is. "spark.akka.frameSize" was too large.
By setting spark.akka.frameSize=10, it worked for news20 dataset.

However, the execution is slow for more large KDD cup 2012, Track 2 dataset (235M+ records of 16.7M+ (2^24) sparse features in 33.6GB) due to the sequential aggregate of dense vectors on a single driver node.

Took 7.6m for aggregation for an iteration on 33 nodes.

@myui
Copy link
Author

myui commented Jun 19, 2014

@myui
Copy link
Author

myui commented Jul 12, 2014

14/07/13 14:23:36 INFO scheduler.TaskSetManager: Serialized task 4.0:265 as 25300257 bytes in 814 ms
14/07/13 14:23:36 INFO scheduler.TaskSetManager: Starting task 4.0:205 as TID 1068 on executor 31: dc10.dbgrid.org (NODE_LOCAL)
768.026: [Full GC [PSYoungGen: 447034K->345941K(932096K)] [ParOldGen: 2784818K->2784818K(2796224K)] 3231853K->3130759K(3728320K) [PS
PermGen: 71339K->71339K(71936K)], 0.7526850 secs] [Times: user=8.62 sys=0.00, real=0.76 secs]
14/07/13 14:23:37 INFO scheduler.TaskSetManager: Serialized task 4.0:205 as 25300257 bytes in 790 ms
14/07/13 14:23:37 INFO scheduler.TaskSetManager: Starting task 4.0:190 as TID 1069 on executor 9: dc18.dbgrid.org (NODE_LOCAL)
768.805: [Full GC [PSYoungGen: 433599K->365016K(932096K)] [ParOldGen: 2784818K->2784817K(2796224K)] 3218417K->3149834K(3728320K) [PS
PermGen: 71339K->71339K(71936K)], 0.6894240 secs] [Times: user=7.82 sys=0.00, real=0.69 secs]
769.519: [Full GC [PSYoungGen: 452911K->370646K(932096K)] [ParOldGen: 2784817K->2784815K(2796224K)] 3237728K->3155462K(3728320K) [PSPermGen: 71339K->71339K(71936K)], 0.8372720 secs] [Times: user=9.11 sys=0.00, real=0.83 secs]
14/07/13 14:23:38 INFO scheduler.TaskSetManager: Serialized task 4.0:190 as 25300257 bytes in 1569 ms
14/07/13 14:23:38 INFO scheduler.TaskSetManager: Starting task 4.0:218 as TID 1070 on executor 21: dc12.dbgrid.org (NODE_LOCAL)
770.374: [Full GC [PSYoungGen: 433610K->389725K(932096K)] [ParOldGen: 2784815K->2784815K(2796224K)] 3218425K->3174540K(3728320K) [PSPermGen: 71339K->71339K(71936K)], 0.7696960 secs] [Times: user=8.14 sys=0.00, real=0.77 secs]
14/07/13 14:23:39 WARN storage.BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts
akka.pattern.AskTimeoutException: Recipient[Actor[akka://spark/user/BlockManagerMaster#292206058]] had already been terminated.
        at akka.pattern.AskableActorRef$.ask$extension(AskSupport.scala:134)
        at org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:236)
        at org.apache.spark.storage.BlockManagerMaster.sendHeartBeat(BlockManagerMaster.scala:51)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$heartBeat(BlockManager.scala:113)
        at org.apache.spark.storage.BlockManager$$anonfun$initialize$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(BlockManager.scala:158)
        at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:790)
        at org.apache.spark.storage.BlockManager$$anonfun$initialize$1.apply$mcV$sp(BlockManager.scala:158)
        at akka.actor.Scheduler$$anon$9.run(Scheduler.scala:80)
        at akka.actor.LightArrayRevolverScheduler$$anon$3$$anon$2.run(Scheduler.scala:241)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:722)
771.159: [Full GC [PSYoungGen: 452738K->395364K(932096K)] [ParOldGen: 2784815K->2784815K(2796224K)] 3237553K->3180180K(3728320K) [PSPermGen: 71342K->71342K(71936K)], 0.8778540 secs] [Times: user=9.45 sys=0.00, real=0.87 secs]
..
14/07/13 14:23:44 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOf(Arrays.java:2271)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
        at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1833)
        at java.io.ObjectOutputStream.write(ObjectOutputStream.java:686)
        at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:128)
        at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1443)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1414)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:346)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
        at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:132)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:415)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5$$anonfun$apply$2.apply$mcVI$sp(TaskSchedulerImpl.scala:242)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:238)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$5.apply(TaskSchedulerImpl.scala:235)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:235)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:235)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:235)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:130)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:102)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
14/07/13 14:23:44 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
14/07/13 14:23:44 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
14/07/13 14:23:44 INFO Remoting: Remoting shut down
14/07/13 14:23:44 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
14/07/13 14:23:45 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@553d70ce
14/07/13 14:23:45 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(dc08.dbgrid.org,45750)
14/07/13 14:23:45 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc08.dbgrid.org,45750)
14/07/13 14:23:45 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(dc08.dbgrid.org,45750)
14/07/13 14:23:45 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@553d70ce
java.nio.channels.CancelledKeyException
        at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:362)
        at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:115)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment