load from file
val moviesDump = sc.textFile("hdfs://localhost:8020/user/datalake/movies/ml-latest/movies.csv")
case class Movie(movieId : Integer, title : String, genres : List[String])
val movies = moviesDump.map(s => s.split(",")).filter(s => s(0)!="movieId")
.map(
s => Movie(s(0).toInt,
s.slice(1, s.size-1).mkString(""),
s(s.size-1).split('|').toList
)
)
movies.take(3).foreach(m => println(m))
movies.count
//data sample
//Movie(1,Toy Story (1995),List(Adventure, Animation, Children, Comedy, Fantasy))
//Movie(2,Jumanji (1995),List(Adventure, Children, Fantasy))
//Movie(3,Grumpier Old Men (1995),List(Comedy, Romance))
transform data to (genre, movie) pairs
val genresDup = movies.flatMap(m => m.genres)
genresDup.count
//unique values
val genresUnique1 = genresDup.distinct()
genresUnique1.collect().foreach(println)
//data sample
//War
//Fantasy
//Western
//Musical
//Horror
convert to pair RDD
val mf = movies.map(m => ((m.movieId, m.title), m.genres))
mf.take(3).foreach(println)
//use flap map
val mfm = mf.flatMapValues(x => x)
mfm.take(10).foreach(println)
println("===")
mfm.first
//sample data
((1,Toy Story (1995)),Adventure)
((1,Toy Story (1995)),Animation)
invert key-values
val genre2Movie = mfm.map{
case (key, value) => (value, key)
}
genre2Movie.take(10).foreach(println)
println("===")
val genre2Count = mfm.map{
case (key, value) => (value, 1)
}
genre2Count.take(10).foreach(println)
genre2Count.count
println("===")
//sample data
//(Adventure,(1,Toy Story (1995)))
//(Animation,(1,Toy Story (1995)))
//(Children,(1,Toy Story (1995)))
count by key
val counts = genre2Count.reduceByKey((x,y)=>x+y)
counts.count
counts.collect().foreach(println)
println("===collect==")
counts.foreach(a => println(a))
println("===")
//sample data
//(War,1345)
//(Fantasy,1692)
//(Western,779)
counts.collect().foreach{ case (key, value) => println("" + key + "," + value) }