Created
September 27, 2017 06:43
-
-
Save paul-brebner/71174cae87887a94ad4707e9a8f1741c to your computer and use it in GitHub Desktop.
Simple Spark MLLib Decision Tree Example (RDD)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spark1; | |
import scala.Tuple2; | |
import java.util.HashMap; | |
import java.util.Map; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.function.Function; | |
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; | |
import org.apache.spark.mllib.regression.LabeledPoint; | |
import org.apache.spark.mllib.tree.DecisionTree; | |
import org.apache.spark.mllib.tree.model.DecisionTreeModel; | |
import org.apache.spark.mllib.util.MLUtils; | |
import org.apache.spark.SparkContext; | |
public class DecisionTreeBlog2 { | |
public static void main(String[] args) { | |
SparkConf conf = new SparkConf().setAppName("Java Decision Tree Classification Example"); | |
conf.setMaster("local"); | |
SparkContext sc = new SparkContext(conf); | |
String path = "WillTheMonolithReact.txt"; | |
// check where best to cache? Here, there, or everywhere? | |
// here as we count it later (more than once). | |
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD().cache(); | |
// how many samples do we have? | |
// count() is an action so will cause everything up to here to be executed for the 1st time | |
long n = data.count(); | |
System.out.println("RDD size = " + n); | |
// count number of positive and negative examples | |
// functions to return true if label == 0 or label == 1 | |
// LabeledPoint is a tuple of (label, features). | |
Function<LabeledPoint, Boolean> label0 = row -> (row.label() == 0.0); | |
Function<LabeledPoint, Boolean> label1 = row -> (row.label() == 1.0); | |
// Note: Only need to count positives! | |
Double neg = (double) data.filter(label0).count(); | |
Double pos = (double) data.filter(label1).count(); | |
System.out.println("pos examples = " + pos); | |
System.out.println("neg examples = " + neg); | |
System.out.println("probability of positive example = " + pos/(double)n); | |
System.out.println("probability of negative example = " + neg/(double)n); | |
// Split sample RDD into two sets, 60% training data, 40% testing data. 11 is a seed. | |
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.6, 0.4}, 11L); | |
JavaRDD<LabeledPoint> trainingData = splits[0].cache(); // cache the data | |
// should we cache testData as well?? | |
JavaRDD<LabeledPoint> testData = splits[1]; | |
// Set parameters for DecisionTree learning. | |
// Empty categoricalFeaturesInfo indicates all features are continuous. | |
Integer numClasses = 2; | |
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>(); | |
String impurity = "gini"; // or “entropy” | |
Integer maxDepth = 5; | |
Integer maxBins = 32; | |
// Train DecisionTree model | |
// org.apache.spark.mllib.tree.DecisionTree | |
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins); | |
System.out.println("Learned classification tree model:\n" + model.toDebugString()); | |
// Evaluate the model on the testData. | |
// For every example in testData, p, replace it by a Tuple of (predicted category, labelled category) | |
// E.g. (1.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,1.0) | |
JavaPairRDD<Object, Object> predictionAndLabels = testData.mapToPair(p -> | |
new Tuple2<>(model.predict(p.features()), p.label())); | |
// Get evaluation metrics. | |
BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(predictionAndLabels.rdd()); | |
// Decision Trees don't have probabilities so only 1 and 0 thresholds. | |
System.out.println("Thresholds: " + metrics.thresholds()); | |
// Precision by threshold | |
JavaRDD<Tuple2<Object, Object>> precision = metrics.precisionByThreshold().toJavaRDD(); | |
System.out.println("Precision by threshold: " + precision.collect()); | |
// Recall by threshold | |
JavaRDD<Tuple2<Object, Object>> recall = metrics.recallByThreshold().toJavaRDD(); | |
System.out.println("Recall by threshold: " + recall.collect()); | |
// F by threshold | |
JavaRDD<Tuple2<Object, Object>> f = metrics.fMeasureByThreshold().toJavaRDD(); | |
System.out.println("F by threshold: " + f.collect()); | |
} | |
} |
Author
paul-brebner
commented
Jun 19, 2019
via email
Hi, sorry for the delay just noticed your question. Did you manage to get
it working? It's relatively old code so may need updating perhaps? Regards,
Paul
…On Sun, 16 Jun 2019 at 04:19, teuku-wahyu ***@***.***> wrote:
i have tried the code in singlenode and its working but how about running
on yarn mode ? i have done that and got some errors like this
" diagnostics: Application application_1556362909924_0040 failed 2 times
due to AM Container for appattempt_1556362909924_0040_000002 exited with
exitCode: 10
For more detailed output, check application tracking page:
http://tfa2:8088/cluster/app/application_1556362909924_0040Then, click on
links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_1556362909924_0040_02_000001
Exit code: 10
Stack trace: ExitCodeException exitCode=10:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
at org.apache.hadoop.util.Shell.run(Shell.java:456)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 10
Failing this attempt. Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1560620938125
final status: FAILED
tracking URL: http://tfa2:8088/cluster/app/application_1556362909924_0040
user: hdfs
Exception in thread "main" org.apache.spark.SparkException: Application
application_1556362909924_0040 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1034)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1081)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/06/16 00:51:15 INFO util.ShutdownHookManager: Shutdown hook called"
please help me ... :')
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<https://gist.github.com/71174cae87887a94ad4707e9a8f1741c?email_source=notifications&email_token=AHQI4O6IVG2PKOMBBKJQU4TP2UXDJA5CNFSM4HYPSFWKYY3PNVWWK3TUL52HS4DFVNDWS43UINXW23LFNZ2KUY3PNVWWK3TUL5UWJTQAFTXJI#gistcomment-2944660>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AHQI4OYEAO65UR6SJITCLULP2UXDJANCNFSM4HYPSFWA>
.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment