Skip to content

Instantly share code, notes, and snippets.

@juanmf
Last active July 15, 2017 22:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save juanmf/ef6445843c1ccad1ebce43df8567a66a to your computer and use it in GitHub Desktop.
Save juanmf/ef6445843c1ccad1ebce43df8567a66a to your computer and use it in GitHub Desktop.
https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_develop_run.html java example written with lambdas and streams, much shorter.
package com.mycompany;
/**
* Created by juanmf on 15/07/17.
*/
import java.util.Arrays;
import java.util.stream.Collectors;
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import scala.Tuple2;
public class WordCount {
public static void main(String[] args) {
// create Spark context with Spark configuration
try (JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"))) {
int threshold = Integer.parseInt(args[1]);
String filePath = args[0];
JavaPairRDD<String, Integer> filtered = filterWordsUnderThreshold(sc, threshold, filePath);
// count characters
JavaPairRDD<Character, Integer> charCounts = filtered
.flatMap(
s -> s._1.chars().mapToObj(i -> (char)i).collect(Collectors.toList()))
.mapToPair(c -> new Tuple2<>(c, 1))
.reduceByKey((i1, i2) -> i1 + i2);
System.out.println(charCounts.collect());
}
}
private static JavaPairRDD<String, Integer> filterWordsUnderThreshold(JavaSparkContext sc, int threshold, String filePath) {
// read in text file and split each document into words
JavaRDD<String> tokenized = sc.textFile(filePath).flatMap(s -> Arrays.asList(s.split(" ")));
// count the occurrence of each word
JavaPairRDD<String, Integer> counts = tokenized
.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
// filter out words with fewer than threshold occurrences
return counts.filter(tup -> tup._2 >= threshold);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment