Skip to content

Instantly share code, notes, and snippets.

@vikas-gonti
Created June 27, 2018 01:08
Show Gist options
  • Save vikas-gonti/8775fce10c8f19ad26f0e0079b41f74d to your computer and use it in GitHub Desktop.
Save vikas-gonti/8775fce10c8f19ad26f0e0079b41f74d to your computer and use it in GitHub Desktop.
IMDB Data Top 50 ratings
// user/gontiv/solutions/imdbdata/title.akas.tsv
// user/gontiv/solutions/imdbdata/title.ratings.tsv
/*
spark-shell --master yarn \
--conf spark.ui.port=12456 \
--num-executors 10 \
--executor-memory 3G \
--executor-cores 2 \
--packages com.databricks:spark-avro_2.10:2.0.1
*/
// Create RDD and remove Header
val titlesRatingRDD = sc.textFile("user/gontiv/solutions/imdbdata/title.ratings.tsv")
val titlesRatingHeader = titlesRatingRDD.first
val titlesRatingWithoutHeader = titlesRatingRDD.filter(rec => rec!= titlesRatingHeader)
//create Map
val titlesRatingMap = titlesRatingWithoutHeader.map(rec => {
val r = rec.split("\t")
(r(0),r(1).toDouble,r(2).toInt)
})
//Create DataFrame and register Temp table
val titlesRatingDF = titlesRatingMap.toDF("titleId","averageRating","numVotes");
titlesRatingDF.registerTempTable("TitlesRating");
//Create RDD and remove Header
val titlesAkasRDD = sc.textFile("user/gontiv/solutions/imdbdata/title.akas.tsv")
val titlesAkasHeader = titlesAkasRDD.first
val titlesAkasWithoutHeader = titlesAkasRDD.filter(rec => rec!= titlesAkasHeader)
//create Map
val titlesAkasMap = titlesAkasWithoutHeader.map(rec => {
val r = rec.split("\t")
(r(0),r(2),r(3),r(4))
})
//Create DataFrame and register Temp table
val titlesAkasDF = titlesAkasMap.toDF("titleId","title","region","language")
titlesAkasDF.registerTempTable("TitlesAkas")
//sql query to get top 50 rated movies
var sqlResult = sqlContext.sql("select a.titleId, a.title,a.region,a.language,b.averageRating,b.numVotes from "+
"TitlesAkas a, (select titleId,averageRating,numVotes from "+
"TitlesRating order by numVotes desc limit 50) b "+
"where a.titleId = b.titleId "+
"order by numVotes desc")
//desired outputs ..
/*
a. Text file
Columns to be seperated with tab "\t"
Compression: BZip2Codec*/
val out = sqlResult.rdd.
map(rec => rec.mkString("\t")).
coalesce(1).saveAsTextFile("user/gontiv/solutions/imdbdata/SQL/text",
classOf[org.apache.hadoop.io.compress.BZip2Codec])
/*
b. Sequence file.
Compression: BZip2Codec*/
val out = sqlResult.rdd.map(rec => {
(rec.getString(0),(rec.getString(0) + "," + rec.getString(1) + "," + rec.getString(2) + "," + rec.getString(3) + "," + rec.getDouble(4) + "," + rec.getInt(5)))
})
out.coalesce(1).saveAsSequenceFile("user/gontiv/solutions/imdbdata/SQL/seq",Some(classOf[org.apache.hadoop.io.compress.BZip2Codec]))
/*
c. JSON file.
Compression: BZip2Codec*/
sqlResult.coalesce(1).write.json("user/gontiv/solutions/imdbdata/SQL/json");
sqlResult.toJSON.coalesce(1).saveAsTextFile("user/gontiv/solutions/imdbdata/SQL/json",classOf[org.apache.hadoop.io.compress.BZip2Codec])
/*
d. Parquet.
Compression: uncompressed */
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed")
sqlResult.coalesce(1).write.parquet("user/gontiv/solutions/imdbdata/SQL/parquet1")
sqlResult.coalesce(1).save("user/gontiv/solutions/imdbdata/SQL/parquet2", "parquet")
/*
e. ORC file. */
sqlResult.coalesce(1).write.orc("user/gontiv/solutions/imdbdata/SQL/orc")
sqlResult.coalesce(1).save("user/gontiv/solutions/imdbdata/SQL/orc1","orc")
/*
f. Avro file.
Compression: uncompressed*/
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
import com.databricks.spark.avro._
sqlResult.coalesce(1).write.avro("user/gontiv/solutions/imdbdata/SQL/avro")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment