Created
February 25, 2018 21:08
-
-
Save Ayush-Singhal28/0a86e39af448bf0e8e9085ad6b3dce5a to your computer and use it in GitHub Desktop.
SparkAssignment-2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Load a local file - file1 | |
scala> val file1 = sc.textFile("/home/knoldus/Desktop/file1.txt") | |
file1: org.apache.spark.rdd.RDD[String] = /home/knoldus/Desktop/file1.txt MapPartitionsRDD[1] at textFile at <console>:24 | |
// Creating a rdd and split based on '#' | |
scala> val fileRDD1 = file1.map(x => x.split('#')) | |
fileRDD1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:26 | |
// Creating a map having id and state only | |
scala> val f1 = fileRDD1.map(x => (x(0),x(2).substring (x(2).lastIndexOf (' '), x(2).length()))) | |
f1: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:28 | |
scala> f1.collect | |
res27: Array[(String, String)] = Array((123," CA"), (456," AK"), (789," AL"), (101112," OR")) | |
scala> val f12 = sc.parallelize(f1.collect) | |
f12: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[4] at parallelize at <console>:30 | |
// Load a local file - file2 | |
scala> val file2 = sc.textFile("/home/knoldus/Desktop/file2.txt") | |
file2: org.apache.spark.rdd.RDD[String] = /home/knoldus/Desktop/file2.txt MapPartitionsRDD[6] at textFile at <console>:24 | |
// Creating a rdd and split based on '#' | |
scala> val fileRDD2 = file2.map(x => x.split('#')) | |
fileRDD2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at map at <console>:26 | |
// Creating a map with id,timestamp,sales | |
scala> val f2 = fileRDD2.map(x => (x(1),(x(0),x(2)))) | |
f2: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[8] at map at <console>:28 | |
scala> f2.collect | |
res28: Array[(String, (String, String))] = Array((123,(1454313600,123456)), (789,(1501578000,123457)), (456,(1470045600,123458)), (789,(1470049200,123459)), (789,(1470049201,223459))) | |
scala> val f34 = sc.parallelize(f2.collect) | |
f34: org.apache.spark.rdd.RDD[(String, (String, String))] = ParallelCollectionRDD[9] at parallelize at <console>:30 | |
// Joining of two RDDS | |
scala> val result = f12.join(f34) | |
result: org.apache.spark.rdd.RDD[(String, (String, (String, String)))] = MapPartitionsRDD[12] at join at <console>:40 | |
scala> result.map(x=>(x._2._1,x._2._2)).collect | |
res6: Array[(String, (String, String))] = Array((" AL",(1501578000,123457)), (" AL",(1470049200,123459)), (" AL",(1470049201,223459)), (" AK",(1470045600,123458)), (" CA",(1454313600,123456))) | |
scala> val result1 = result.map(x=>(x._2._1,x._2._2._1,x._2._2._2)) | |
result1: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[21] at map at <console>:44 | |
scala> result1.collect | |
res9: Array[(String, String, String)] = Array((" AL",1501578000,123457), (" AL",1470049200,123459), (" AL",1470049201,223459), (" AK",1470045600,123458), (" CA",1454313600,123456)) | |
// importing Java libraries of Date | |
scala> import java.text.SimpleDateFormat | |
import java.text.SimpleDateFormat | |
scala> import java.util.Date | |
import java.util.Date | |
// Conversion epoch to human understandable date | |
scala> val mdy = result1.map(x => (x._1,(new SimpleDateFormat("yyyy/MM/dd").format(new Date(((x._2).toLong) * 1000))),x._3)) | |
mdy: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[34] at map at <console>:46 | |
scala> mdy.saveAsTextFile("/home/knoldus/Desktop/mdy.txt") | |
scala> val my = result1.map(x => (x._1,(new SimpleDateFormat("yyyy/MM").format(new Date(((x._2).toLong) * 1000))),x._3)) | |
my: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[36] at map at <console>:46 | |
scala> my.saveAsTextFile("/home/knoldus/Desktop/my.txt") | |
scala> val d = result1.map(x => (x._1,(new SimpleDateFormat("dd").format(new Date(((x._2).toLong) * 1000))),x._3)) | |
d: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[38] at map at <console>:46 | |
scala> d.saveAsTextFile("/home/knoldus/Desktop/d.txt") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment