Skip to content

Instantly share code, notes, and snippets.

@Ayush-Singhal28
Created February 25, 2018 21:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ayush-Singhal28/0a86e39af448bf0e8e9085ad6b3dce5a to your computer and use it in GitHub Desktop.
Save Ayush-Singhal28/0a86e39af448bf0e8e9085ad6b3dce5a to your computer and use it in GitHub Desktop.
SparkAssignment-2
// Load a local file - file1
scala> val file1 = sc.textFile("/home/knoldus/Desktop/file1.txt")
file1: org.apache.spark.rdd.RDD[String] = /home/knoldus/Desktop/file1.txt MapPartitionsRDD[1] at textFile at <console>:24
// Creating a rdd and split based on '#'
scala> val fileRDD1 = file1.map(x => x.split('#'))
fileRDD1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:26
// Creating a map having id and state only
scala> val f1 = fileRDD1.map(x => (x(0),x(2).substring (x(2).lastIndexOf (' '), x(2).length())))
f1: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[3] at map at <console>:28
scala> f1.collect
res27: Array[(String, String)] = Array((123," CA"), (456," AK"), (789," AL"), (101112," OR"))
scala> val f12 = sc.parallelize(f1.collect)
f12: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[4] at parallelize at <console>:30
// Load a local file - file2
scala> val file2 = sc.textFile("/home/knoldus/Desktop/file2.txt")
file2: org.apache.spark.rdd.RDD[String] = /home/knoldus/Desktop/file2.txt MapPartitionsRDD[6] at textFile at <console>:24
// Creating a rdd and split based on '#'
scala> val fileRDD2 = file2.map(x => x.split('#'))
fileRDD2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at map at <console>:26
// Creating a map with id,timestamp,sales
scala> val f2 = fileRDD2.map(x => (x(1),(x(0),x(2))))
f2: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[8] at map at <console>:28
scala> f2.collect
res28: Array[(String, (String, String))] = Array((123,(1454313600,123456)), (789,(1501578000,123457)), (456,(1470045600,123458)), (789,(1470049200,123459)), (789,(1470049201,223459)))
scala> val f34 = sc.parallelize(f2.collect)
f34: org.apache.spark.rdd.RDD[(String, (String, String))] = ParallelCollectionRDD[9] at parallelize at <console>:30
// Joining of two RDDS
scala> val result = f12.join(f34)
result: org.apache.spark.rdd.RDD[(String, (String, (String, String)))] = MapPartitionsRDD[12] at join at <console>:40
scala> result.map(x=>(x._2._1,x._2._2)).collect
res6: Array[(String, (String, String))] = Array((" AL",(1501578000,123457)), (" AL",(1470049200,123459)), (" AL",(1470049201,223459)), (" AK",(1470045600,123458)), (" CA",(1454313600,123456)))
scala> val result1 = result.map(x=>(x._2._1,x._2._2._1,x._2._2._2))
result1: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[21] at map at <console>:44
scala> result1.collect
res9: Array[(String, String, String)] = Array((" AL",1501578000,123457), (" AL",1470049200,123459), (" AL",1470049201,223459), (" AK",1470045600,123458), (" CA",1454313600,123456))
// importing Java libraries of Date
scala> import java.text.SimpleDateFormat
import java.text.SimpleDateFormat
scala> import java.util.Date
import java.util.Date
// Conversion epoch to human understandable date
scala> val mdy = result1.map(x => (x._1,(new SimpleDateFormat("yyyy/MM/dd").format(new Date(((x._2).toLong) * 1000))),x._3))
mdy: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[34] at map at <console>:46
scala> mdy.saveAsTextFile("/home/knoldus/Desktop/mdy.txt")
scala> val my = result1.map(x => (x._1,(new SimpleDateFormat("yyyy/MM").format(new Date(((x._2).toLong) * 1000))),x._3))
my: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[36] at map at <console>:46
scala> my.saveAsTextFile("/home/knoldus/Desktop/my.txt")
scala> val d = result1.map(x => (x._1,(new SimpleDateFormat("dd").format(new Date(((x._2).toLong) * 1000))),x._3))
d: org.apache.spark.rdd.RDD[(String, String, String)] = MapPartitionsRDD[38] at map at <console>:46
scala> d.saveAsTextFile("/home/knoldus/Desktop/d.txt")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment