Last active
September 21, 2017 17:19
-
-
Save udion/6cbbc1110ddfb53b96bf3c4cca57a658 to your computer and use it in GitHub Desktop.
playing with spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Bend your chest open so I can reach your heart. | |
I need to get inside, or I'll start a war. | |
Wanna look at the pieces that make you who you are. | |
I wanna build you up and pick you apart. | |
Let me see the dark sides as well as the bright. | |
I'm gonna love you inside out. | |
I'm gonna love you inside out. | |
Let me see the dark sides as well as the bright. | |
I'm gonna love you inside out. | |
I'm gonna love you inside out. | |
I'm gonna love you in. | |
I'm gonna love you. | |
I'm gonna love you. | |
I'm gonna pick your brain and get to know your thoughts. | |
So I can read your mind when you don't wanna talk. | |
And can I touch your face before you go. | |
I collect your scales but you don't have to know. | |
Let me see the dark sides as well as the bright. | |
I'm gonna love you inside out. | |
I'm gonna love you inside out. | |
Let me see the dark sides as well as the bright. | |
I'm gonna love you inside out. | |
I'm gonna love you inside out. | |
I'm gonna love you in. | |
I'm gonna love you. | |
I'm gonna love you. | |
I'm gonna love you inside out. | |
Your love, inside out. | |
I'm gonna love you inside out. | |
I'm gonna love you inside out. | |
Inside out. | |
Your love, inside out. | |
I'm gonna love, I'm gonna love you inside out. | |
Ou-ou-ou-out, ou-ou-ou-out. | |
I'm gonna love you. | |
Ou-ou-ou-out, ou-ou-ou-out. | |
I'm gonna love you. | |
Ou-ou-ou-out, ou-ou-ou-out. | |
I'm gonna love you . |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.Tuple2; | |
import scala.Tuple3; | |
import org.apache.spark.SparkContext; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.sql.*; | |
import org.apache.spark.util.LongAccumulator; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.regex.Pattern; | |
public final class wordCount { | |
private static final Pattern SPACE = Pattern.compile(" "); | |
public static List<String> getWords(String s) { | |
String[] words = s.split(" "); | |
return Arrays.asList(words); | |
} | |
public static List<Tuple2<String, String>> getWordsPair(String s) { | |
String[] words = s.split(" "); | |
List<Tuple2<String, String>> wordsPair = new ArrayList<Tuple2<String, String>>(); | |
for(int i=0; i<words.length - 1; i++) { | |
wordsPair.add(new Tuple2<String, String>(words[i], words[i+1])); | |
} | |
return wordsPair; | |
} | |
public static Tuple2<Tuple2<String,String>, Tuple2<Float, Float>> restructure3_4(Tuple2<String, Tuple2<Tuple2<String, Float>, Float>> t){ | |
String w2 = t._1(); | |
String w1 = t._2()._1()._1(); | |
float cnt1 = t._2()._1()._2(); | |
float cnt2 = t._2()._2(); | |
Tuple2<String, String>x1 = new Tuple2<String, String>(w1,w2); | |
Tuple2<Float, Float>x2 = new Tuple2<Float, Float>(cnt1, cnt2); | |
Tuple2<Tuple2<String, String>, Tuple2<Float, Float>> x = new Tuple2<Tuple2<String,String>, Tuple2<Float, Float> >(x1, x2); | |
return x; | |
} | |
public static Tuple2<Tuple2<String, String>, Tuple3<Float, Float, Float> >restructure4_5(Tuple2<Tuple2<String, String>, Tuple2<Tuple2<Float, Float>, Float>> s){ | |
Tuple2<String, String> wp = s._1(); | |
float cnt1 = s._2()._1()._1(); | |
float cnt2 = s._2()._1()._2(); | |
float cntpair = s._2()._2(); | |
Tuple3<Float, Float, Float> x = new Tuple3<Float, Float, Float>(cntpair, cnt1, cnt2); | |
Tuple2<Tuple2<String, String>, Tuple3<Float, Float, Float>> f = new Tuple2<Tuple2<String,String>, Tuple3<Float, Float, Float>>(wp,x); | |
return f; | |
} | |
public static void main(String[] args){ | |
if (args.length < 1) { | |
System.err.println("Usage: JavaWordCount <file>"); | |
System.exit(1); | |
} | |
SparkSession spark = SparkSession | |
.builder() | |
.appName("JavaWordCount") | |
.getOrCreate(); | |
//to get the spark context | |
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); | |
//Code to split a file into sentences: | |
sc.hadoopConfiguration().set("textinputformat.record.delimiter","."); | |
String path = args[0]; | |
JavaRDD<String> lines = sc.textFile("file:///"+path).cache(); | |
lines = lines.map(s -> s.toLowerCase().replaceAll("[^A-za-z]", " ").replaceAll("( )+", " ").trim()); | |
//RDD's for the words and the pairwords of each of the sentences | |
//that can be created by using the RDD's of lines | |
//using flatmap and the functions which can return me the splits | |
JavaRDD<String> words = lines.flatMap(s -> getWords(s).iterator()); | |
JavaRDD<Tuple2<String, String>> wordsPair = lines.flatMap(s -> getWordsPair(s).iterator()); | |
//to get the counts of the #words and #number of words | |
JavaPairRDD<String, Integer> wordOnes = words.mapToPair(s -> new Tuple2<>(s,1)); | |
JavaPairRDD< Tuple2<String, String>, Integer> pairOnes = wordsPair.mapToPair(s -> new Tuple2<>(s,1)); | |
JavaPairRDD<String, Integer> wordCount = wordOnes.reduceByKey((i1, i2) -> i1+i2); | |
JavaPairRDD< Tuple2<String, String>, Integer> pairCount = pairOnes.reduceByKey((i1, i2) -> i1+i2); | |
//to get the total count all together of the documents given | |
SparkContext ssc = sc.sc(); | |
LongAccumulator wcval = ssc.longAccumulator(); | |
LongAccumulator wpcval = ssc.longAccumulator(); | |
wordCount.foreach(t -> wcval.add(t._2())); | |
pairCount.foreach(t -> wpcval.add(t._2())); | |
float wc = wcval.value(); | |
float wpc = wpcval.value(); | |
//to get the probability of each of the words and pairs | |
JavaPairRDD<String, Float> wordProb = wordCount.mapToPair(s -> new Tuple2<String, Float>(s._1(), s._2()/wc)); | |
JavaPairRDD<Tuple2<String, String>, Float> pairProb = pairCount.mapToPair(s -> new Tuple2<Tuple2<String, String>, Float>(s._1(), s._2()/wpc)); | |
//To get the joins of various RDDs | |
JavaPairRDD<String, String> wP = wordsPair.mapToPair(s -> s); | |
JavaPairRDD<String ,Tuple2<String, Float>> wP1 = wP.join(wordProb); //this gives me (w1, (w2, cnt1)) | |
JavaPairRDD<String, Tuple2<String, Float>> wP2 = wP1.mapToPair(s -> new Tuple2<>(s._2()._1(), new Tuple2<>(s._1(), s._2()._2()))); | |
//wp2 has (w2, (w1,cnt1)) | |
JavaPairRDD<String, Tuple2<Tuple2<String, Float>, Float>> wP3 = wP2.join(wordProb); | |
//wp3 has (w2, ((w1,cnt1), cnt2)), restructuring | |
JavaPairRDD<Tuple2<String,String>, Tuple2<Float, Float>> wP4 = wP3.mapToPair(s -> restructure3_4(s)); | |
//wP4 has ((w1, w2), (cnt1, cnt2)) | |
JavaPairRDD<Tuple2<String, String>, Tuple2<Tuple2<Float, Float>, Float>> wP5 = wP4.join(pairProb); | |
JavaPairRDD<Tuple2<String, String>, Tuple3<Float, Float, Float>> wP6 = wP5.mapToPair(s -> restructure4_5(s)); | |
//to avoid redundancy in the final RDD | |
JavaPairRDD<Tuple2<String, String>, Tuple3<Float, Float, Float>> wP7 = wP6.reduceByKey((i1, i2)->i1); | |
//to apply the filter selecting only those pair which occured more than | |
//10 times as we would have expected a random word pair to be | |
JavaPairRDD<Tuple2<String, String>, Tuple3<Float, Float, Float>> wP8 = wP7.filter(s -> s._2()._1() > 10*s._2()._2()*s._2()._3()); | |
//priniting the total count | |
List<Tuple2<String, Integer>> wordCnt = wordCount.reduceByKey((i1, i2)->i1).collect(); | |
List< Tuple2<String, Tuple2<String, Float>> >temp = wP2.reduceByKey((i1, i2)->i1).collect(); | |
List<Tuple2< Tuple2<String, String>, Integer>> pairwordCnt = pairCount.reduceByKey((i1, i2)->i1).collect(); | |
List< Tuple2<Tuple2<String, String>, Tuple3<Float, Float, Float>> > wpf = wP7.collect(); | |
List< Tuple2<String, Float> >temp1 = wordProb.reduceByKey((i1, i2)->i1).collect(); | |
List< Tuple2<Tuple2<String, String>, Tuple3<Float, Float, Float>> >wpFilter = wP8.collect(); | |
// for(Tuple2<?,?> tuple: wordCnt) { | |
// System.out.println("word count for "+tuple._1()+":"+tuple._2()); | |
// } | |
// | |
// for(Tuple2<?,?> tuple: pairwordCnt) { | |
// Tuple2<String,String> p= (Tuple2<String, String>) tuple._1(); | |
// System.out.println("pair count for {"+p._1()+" "+p._2()+"}:"+tuple._2()); | |
// } | |
// | |
System.out.println("\n\n word: word_probability...."); | |
for(Tuple2<?,?> p: temp1) { | |
String w1 = (String) p._1(); | |
float c = (float)p._2(); | |
System.out.println("probability of "+w1+": " +c); | |
} | |
System.out.println("\n\n (word pair): joint_probability...."); | |
for(Tuple2<?,?> p: temp) { | |
String w2 = (String) p._1(); | |
String w1 = (String) ((Tuple2<String, Float>) p._2())._1(); | |
float c = (float) ((Tuple2<String, Float>) p._2())._2(); | |
System.out.println("probabbility of ("+w1+" "+w2+"): "+c); | |
} | |
System.out.println("\n\n word pair :(joint_probability | word1_probability | word2_probability)...."); | |
for(Tuple2<?,?> t: wpf) { | |
String w1 = ((Tuple2<String, String>) t._1())._1(); | |
String w2 = ((Tuple2<String, String>) t._1())._2(); | |
float cntp = ((Tuple3<Float, Float, Float>) t._2())._1(); | |
float cnt1 = ((Tuple3<Float, Float, Float>) t._2())._2(); | |
float cnt2 = ((Tuple3<Float, Float, Float>) t._2())._3(); | |
System.out.println(w1+" "+w2+" :("+cntp+" | "+cnt1+" | "+cnt2+")"); | |
} | |
System.out.println("\n\nThe phrases used are...."); | |
for(Tuple2<?,?> t: wpFilter) { | |
String w1 = ((Tuple2<String, String>) t._1())._1(); | |
String w2 = ((Tuple2<String, String>) t._1())._2(); | |
float cntp = ((Tuple3<Float, Float, Float>) t._2())._1(); | |
float cnt1 = ((Tuple3<Float, Float, Float>) t._2())._2(); | |
float cnt2 = ((Tuple3<Float, Float, Float>) t._2())._3(); | |
System.out.println(w1+" "+w2+" :("+cntp+" | "+cnt1+" | "+cnt2+")"); | |
} | |
sc.close(); | |
spark.stop(); | |
} | |
} |
Author
udion
commented
Sep 21, 2017
•
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment