Created
June 27, 2018 01:08
-
-
Save vikas-gonti/8775fce10c8f19ad26f0e0079b41f74d to your computer and use it in GitHub Desktop.
IMDB Data Top 50 ratings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// user/gontiv/solutions/imdbdata/title.akas.tsv | |
// user/gontiv/solutions/imdbdata/title.ratings.tsv | |
/* | |
spark-shell --master yarn \ | |
--conf spark.ui.port=12456 \ | |
--num-executors 10 \ | |
--executor-memory 3G \ | |
--executor-cores 2 \ | |
--packages com.databricks:spark-avro_2.10:2.0.1 | |
*/ | |
// Create RDD and remove Header | |
val titlesRatingRDD = sc.textFile("user/gontiv/solutions/imdbdata/title.ratings.tsv") | |
val titlesRatingHeader = titlesRatingRDD.first | |
val titlesRatingWithoutHeader = titlesRatingRDD.filter(rec => rec!= titlesRatingHeader) | |
//create Map | |
val titlesRatingMap = titlesRatingWithoutHeader.map(rec => { | |
val r = rec.split("\t") | |
(r(0),r(1).toDouble,r(2).toInt) | |
}) | |
//Create DataFrame and register Temp table | |
val titlesRatingDF = titlesRatingMap.toDF("titleId","averageRating","numVotes"); | |
titlesRatingDF.registerTempTable("TitlesRating"); | |
//Create RDD and remove Header | |
val titlesAkasRDD = sc.textFile("user/gontiv/solutions/imdbdata/title.akas.tsv") | |
val titlesAkasHeader = titlesAkasRDD.first | |
val titlesAkasWithoutHeader = titlesAkasRDD.filter(rec => rec!= titlesAkasHeader) | |
//create Map | |
val titlesAkasMap = titlesAkasWithoutHeader.map(rec => { | |
val r = rec.split("\t") | |
(r(0),r(2),r(3),r(4)) | |
}) | |
//Create DataFrame and register Temp table | |
val titlesAkasDF = titlesAkasMap.toDF("titleId","title","region","language") | |
titlesAkasDF.registerTempTable("TitlesAkas") | |
//sql query to get top 50 rated movies | |
var sqlResult = sqlContext.sql("select a.titleId, a.title,a.region,a.language,b.averageRating,b.numVotes from "+ | |
"TitlesAkas a, (select titleId,averageRating,numVotes from "+ | |
"TitlesRating order by numVotes desc limit 50) b "+ | |
"where a.titleId = b.titleId "+ | |
"order by numVotes desc") | |
//desired outputs .. | |
/* | |
a. Text file | |
Columns to be seperated with tab "\t" | |
Compression: BZip2Codec*/ | |
val out = sqlResult.rdd. | |
map(rec => rec.mkString("\t")). | |
coalesce(1).saveAsTextFile("user/gontiv/solutions/imdbdata/SQL/text", | |
classOf[org.apache.hadoop.io.compress.BZip2Codec]) | |
/* | |
b. Sequence file. | |
Compression: BZip2Codec*/ | |
val out = sqlResult.rdd.map(rec => { | |
(rec.getString(0),(rec.getString(0) + "," + rec.getString(1) + "," + rec.getString(2) + "," + rec.getString(3) + "," + rec.getDouble(4) + "," + rec.getInt(5))) | |
}) | |
out.coalesce(1).saveAsSequenceFile("user/gontiv/solutions/imdbdata/SQL/seq",Some(classOf[org.apache.hadoop.io.compress.BZip2Codec])) | |
/* | |
c. JSON file. | |
Compression: BZip2Codec*/ | |
sqlResult.coalesce(1).write.json("user/gontiv/solutions/imdbdata/SQL/json"); | |
sqlResult.toJSON.coalesce(1).saveAsTextFile("user/gontiv/solutions/imdbdata/SQL/json",classOf[org.apache.hadoop.io.compress.BZip2Codec]) | |
/* | |
d. Parquet. | |
Compression: uncompressed */ | |
sqlContext.setConf("spark.sql.parquet.compression.codec", "uncompressed") | |
sqlResult.coalesce(1).write.parquet("user/gontiv/solutions/imdbdata/SQL/parquet1") | |
sqlResult.coalesce(1).save("user/gontiv/solutions/imdbdata/SQL/parquet2", "parquet") | |
/* | |
e. ORC file. */ | |
sqlResult.coalesce(1).write.orc("user/gontiv/solutions/imdbdata/SQL/orc") | |
sqlResult.coalesce(1).save("user/gontiv/solutions/imdbdata/SQL/orc1","orc") | |
/* | |
f. Avro file. | |
Compression: uncompressed*/ | |
sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") | |
import com.databricks.spark.avro._ | |
sqlResult.coalesce(1).write.avro("user/gontiv/solutions/imdbdata/SQL/avro") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment