Skip to content

Instantly share code, notes, and snippets.

@ramv-dailymotion
Created December 5, 2015 17:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramv-dailymotion/4e19b96b625c52d7ed3b to your computer and use it in GitHub Desktop.
Save ramv-dailymotion/4e19b96b625c52d7ed3b to your computer and use it in GitHub Desktop.
public static void saveAlsKMeansRecosAsParquet(JavaPairRDD<Integer, Tuple2<DmRating, Integer>> userIdRatingClusterIdRDD,
int numPartitions,
JavaSparkContext javaSparkContext,
String outdir){
JavaRDD<DmRating> dmRatingJavaRDD = userIdRatingClusterIdRDD.map(new Function<Tuple2<Integer, Tuple2<DmRating, Integer>>, DmRating>() {
public DmRating call(Tuple2<Integer, Tuple2<DmRating, Integer>> v1) throws Exception {
//Integer userId = v1._1();
Tuple2<DmRating, Integer> values = v1._2();
DmRating rating = values._1();
Integer clusterId = values._2();
rating.setClusterId(clusterId);
rating.setVideoId(rating.product());
rating.setV1stOrUserId((rating.userId== null || rating.userId.isEmpty())? rating.v1stId : rating.userId);
rating.setRedisId(rating.user());
return rating;
//return String.format("{\"clusterId\": %s,\"userId\": %s, \"userId\":\"%s\", \"videoId\": %s}", clusterId, userId, rating.userId, rating.product());
}
});
SQLContext sqlContext = new SQLContext(javaSparkContext);
DataFrame dmRatingDF = sqlContext.createDataFrame(dmRatingJavaRDD, DmRating.class);
dmRatingDF.registerTempTable("dmrating");
DataFrame clusterIdVideoIdDF = sqlContext.sql("SELECT clusterId, videoId FROM dmrating").cache();
DataFrame rolledupClusterIdVideoIdDF = clusterIdVideoIdDF.rollup("clusterId","videoId").count().cache();
DataFrame clusterIdUserIdDF = sqlContext.sql("SELECT clusterId, userId, redisId, v1stId FROM dmrating").distinct().cache();
//clusterIdUserIdDF.write().json(outdir+"/userid_clusterid");
//TODO commenting out for testing
//rolledupClusterIdVideoIdDF.write().json(outdir+"/clusterid_videoid");
JavaRDD<Row> rolledUpRDD = rolledupClusterIdVideoIdDF.toJavaRDD();
JavaRDD<Row> filteredRolledUpRDD = rolledUpRDD.filter(new Function<Row, Boolean>() {
@Override
public Boolean call(Row v1) throws Exception {
//make sure the size and values of the properties are correct
return !(v1.size()!=3 || v1.isNullAt(0) || v1.isNullAt(1) || v1.isNullAt(2));
}
}).cache();
JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterIdVideoIdCount = filteredRolledUpRDD
.mapToPair(new PairFunction<Row, Integer, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Tuple2<Integer, Integer>> call(Row row) throws Exception {
Tuple2<Integer, Integer> videoIdCount = new Tuple2<Integer, Integer>(row.getInt(1), Long.valueOf(row.getLong(2)).intValue());
return new Tuple2<Integer, Tuple2<Integer, Integer>>(row.getInt(0),videoIdCount);
}
})
.partitionBy(new HashPartitioner(numPartitions))
.persist(StorageLevel.MEMORY_ONLY_SER_2());
//clusterIdVideoIdCount.saveAsTextFile(outdir+"/clusterid_videoid", GzipCodec.class);
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> groupedPair = clusterIdVideoIdCount.groupByKey(numPartitions).cache();
JavaRDD<ClusterIdVideos> groupedFlat = groupedPair.map(new Function<Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>>, ClusterIdVideos>() {
@Override
public ClusterIdVideos call(Tuple2<Integer, Iterable<Tuple2<Integer, Integer>>> v1) throws Exception {
ClusterIdVideos row = new ClusterIdVideos();
Iterable<Tuple2<Integer, Integer>> videosCounts= v1._2();
StringBuilder recosStr = new StringBuilder();
recosStr.append("[");
boolean appendComa = false;
for(Tuple2<Integer, Integer> videoCount : videosCounts){
if(appendComa) recosStr.append(",");
recosStr.append("{");
recosStr.append("\"video_id\":");
recosStr.append(videoCount._1());
recosStr.append(",");
recosStr.append("\"count\":");
recosStr.append(videoCount._2());
recosStr.append("}");
appendComa = true;
}
recosStr.append("]");
row.setClusterId(v1._1());
row.setVideos(recosStr.toString());
return row;
}
}).cache();
//groupedFlat.saveAsTextFile(outdir+"/clusterid_videoid", GzipCodec.class);
DataFrame groupedClusterId = sqlContext.createDataFrame(groupedFlat, ClusterIdVideos.class);
DataFrame recosDf = clusterIdUserIdDF.join(groupedClusterId, "clusterId");
recosDf.write().parquet(outdir+"/good");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment