Skip to content

Instantly share code, notes, and snippets.

@rajesh-h
Created April 18, 2020 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rajesh-h/0515bc0be93c1ee96ace2b821fa7b56d to your computer and use it in GitHub Desktop.
Save rajesh-h/0515bc0be93c1ee96ace2b821fa7b56d to your computer and use it in GitHub Desktop.
//Assignment 1:
//Answer A:
//reading input (DataBricks Console not wipro vdi)
var inp = sc.textFile("/FileStore/tables/log_file.txt")
// get the Header
var header = inp.first()
//Get only details record by removing header
var details = inp.filter(row => row != header)
// 0 = "date",
// 1 = "time",
// 2 = "size",
// 3 = "r_version",
// 4 = "r_arch",
// 5 = "r_os",
// 6 = "package",
// 7 = "version",
// 8 = "country",
// 9 = "ip_id"
// details.first()
// Filter out records where package = “NA” and version = “NA”
// Get only the NA records
// details.filter(row => row.split(",")(6) == "NA" && row.split(",")(7) == "NA" ).collect()
// Answer B - Ignore NA records
var validDetails = details.filter(row => row.split(",")(6) != "NA" && row.split(",")(7) != "NA" )
//Answer C
var answerC = validDetails.map(row => (row.split(",")(6),1) ).reduceByKey((x,y) => x + y).collect()
//Answer D:
//Create rdd for country and download size
var answerD_0 = validDetails.map(row => (row.split(",")(8),row.split(",")(2).toInt) )
//Get the sum and count to calculate average
var answerD = answerD_0.mapValues(x=> (x, 1)).reduceByKey((x, y) =>(x._1+y._1, x._2+y._2)).mapValues{ case(sum, count) => sum /count}.collect()
//Assignment 2:
// Answer A
//reading input (DataBricks Console not wipro vdi)
// read and remove invalid records
var inp = sc.textFile("/FileStore/tables/log_file.txt")
var header = inp.first()
var details = inp.filter(row => row != header)
//Below is required if we have to ignore NA package and NA version, for now I am not considering this as we have to ignore
// var validDetails = details.filter(row => row.split(",")(6) != "NA" && row.split(",")(7) != "NA" )
//Answer B:
var countryMapRdd = sc.parallelize(List(("AU","AUSTRALIA"), ("FR","FRANCE"), ("NL","NETHERLANDS"), ("US","UNITED STATES"), ("DE","GERMANY"), ("GB","GREAT BRITAIN"), ("DK","DENAMRK"), ("CH","CHINA")))
//Answer C:
//Strip double quotes so you can join against countryMapRdd
var answerC_0 = details.map(row => (row.split(",")(8).stripPrefix("\"").stripSuffix("\""),1) ).reduceByKey((x,y) => x + y)
var answerC = answerC_0.join(countryMapRdd).map(x => x._1 + "," + x._2._2 + "," + x._2._1)
//Answer D:
answerC.saveAsTextFile("/FileStore/tables/assignment2output")
// Assignment 3:
//Answer A:
//reading input (DataBricks Console not wipro vdi)
var inp = sc.textFile("/FileStore/tables/log_file.txt")
var header = inp.first()
var details = inp.filter(row => row != header)
//Answer B:
var inputDF = details.map(x=> x.split(",")).map( x => (x(0),x(1),x(2).toInt,x(3),x(4),x(5),x(6),x(7),x(8),x(9) )).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id")
//Answer C:
import org.apache.spark.sql.functions._
var inputDF_0 = inputDF.withColumn("Download_Type",
when(inputDF("size") < 100000, "Small")
.when(inputDF("size") > 100000 && inputDF("size") < 1000000, "Medium")
.otherwise("Big"))
//Answer D:
inputDF_0.groupBy("country","Download_Type").count().show()
//Answer E:
inputDF_0.groupBy("country","Download_Type").count().write.parquet("download_details.parquet")
//Assignment 4:
//Answer A:
//reading input (DataBricks Console not wipro vdi)
var inp = sc.textFile("/FileStore/tables/log_file.txt")
var header = inp.first()
var details = inp.filter(row => row != header)
//DF
var inputDF = details.map(x=> x.split(",")).map( x => (x(0),x(1),x(2).toInt,x(3),x(4),x(5),x(6),x(7),x(8),x(9) )).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id")
//Write as Json
inputDF.write.format("json").save("/FileStore/tables/log_file.json")
//Answer B:
inpJson = sqlContext.read.json("/FileStore/tables/log_file.json")
//Answer C:
// Max, min, average
inpJson.groupBy("date").agg(max("size") as "MaxSize",min("size") as "MinSize", avg("size") as "AvgSize")
//Wirte to Json
inpJson.groupBy("date").agg(max("size") as "MaxSize",min("size") as "MinSize", avg("size") as "AvgSize").orderBy("date").write.format("json").save("/FileStore/tables/assignment_4.json")
//Assignment 5:
//Streaming not tested practically
ssc = StreamingContext(sc, 10)
var details = ssc.socketTextStream("localhost", 9999)
var inputDF = details.map(x=> x.split(",")).map( x => (x(0),x(1),x(2).toInt,x(3),x(4),x(5),x(6),x(7),x(8),x(9) )).toDF("date","time","size","r_version","r_arch","r_os","package","version","country","ip_id")
inputDF.filter(inputDF("size") > 100000).write.format("csv").save("/FileStore/tables/logs_large_downloads/log_file.csv")
inputDF.filter(inputDF("size") < 100000).write.format("csv").save("/FileStore/tables/logs_others/log_file.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment