Created
December 4, 2015 02:14
-
-
Save ramv-dailymotion/9e3881125442b934cf49 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static JavaRDD<String> getJsonUserIdVideoIdRDD(JavaRDD<Rating> cachedRating, | |
JavaPairRDD<Integer, Integer> userIdClusterId, | |
int numPartitions, String outDir){ | |
/* | |
convert the JavaRDD<Rating> to JavaPairRDD<Integer,DmRating> | |
*/ | |
JavaPairRDD<Integer,DmRating> userIdDmRating = cachedRating.mapToPair(new PairFunction<Rating, Integer, DmRating>() { | |
public Tuple2<Integer, DmRating> call(Rating dmRating) throws Exception { | |
return new Tuple2<>(dmRating.user(), (DmRating)dmRating); | |
} | |
}); | |
/* | |
join this RDD with userIdClusterID RDD by key | |
*/ | |
JavaPairRDD<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating = userIdClusterId.join(userIdDmRating, numPartitions); | |
// extract the clusterId to videoId map | |
JavaPairRDD<Integer, Integer> clusterIdVideoId = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer,DmRating>>, Integer, Integer>() { | |
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userIdDmRatingClusterId ) throws Exception { | |
Integer userId = userIdDmRatingClusterId._1(); | |
Tuple2<Integer, DmRating> dmRatingClusterId = userIdDmRatingClusterId._2(); | |
return new Tuple2<Integer, Integer>(dmRatingClusterId._1(), dmRatingClusterId._2().product()); | |
} | |
}); | |
////// | |
/// Count the popularity of a video in a cluster | |
JavaPairRDD<String, Integer> clusterIdVideoIdStrInt = clusterIdVideoId.mapToPair(new PairFunction<Tuple2<Integer, Integer>, String, Integer>() { | |
@Override | |
public Tuple2<String, Integer> call(Tuple2<Integer, Integer> videoIdClusterId) throws Exception { | |
return new Tuple2<>(String.format("%d:%d", videoIdClusterId._1(), videoIdClusterId._2()), 1); | |
} | |
}); | |
JavaPairRDD<String, Integer> clusterIdVideoIdStrCount = clusterIdVideoIdStrInt.reduceByKey(new Function2<Integer, Integer, Integer>() { | |
@Override | |
public Integer call(Integer v1, Integer v2) throws Exception { | |
return v1+v2; | |
} | |
}); | |
/// | |
JavaPairRDD<Integer, Tuple2<Integer, Integer>> clusterId_T_videoIdCount = clusterIdVideoIdStrCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, Tuple2<Integer, Integer>>() { | |
@Override | |
public Tuple2<Integer, Tuple2<Integer, Integer>> call(Tuple2<String, Integer> clusterIdVideoIdStrCount) throws Exception { | |
String[] splits = clusterIdVideoIdStrCount._1().split(":"); | |
try{ | |
if(splits.length==2){ | |
int clusterId = Integer.parseInt(splits[0]); | |
int videoId = Integer.parseInt(splits[1]); | |
return new Tuple2<>(clusterId, new Tuple2<>(videoId, clusterIdVideoIdStrCount._2())); | |
}else{ | |
//Should never occur | |
LOGGER.error("Could not split {} into two with : as the separator!", clusterIdVideoIdStrCount._1()); | |
} | |
}catch (NumberFormatException ex){ | |
LOGGER.error(ex.getMessage()); | |
} | |
return new Tuple2<>(-1, new Tuple2<>(-1,-1)); | |
} | |
}); | |
JavaPairRDD<Integer, Iterable<Tuple2<Integer, Integer>>> clusterIdVideoIdGrouped = clusterId_T_videoIdCount.groupByKey(); | |
JavaPairRDD<Integer, DmRating> clusterIdDmRating = userId_T_clusterIdDmRating.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<Integer, DmRating>>, Integer, DmRating>() { | |
@Override | |
public Tuple2<Integer, DmRating> call(Tuple2<Integer, Tuple2<Integer, DmRating>> userId_T_clusterIdDmRating) throws Exception { | |
return userId_T_clusterIdDmRating._2(); | |
} | |
}); | |
JavaPairRDD<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> clusterId_T_DmRatingVideoIds = clusterIdDmRating.join(clusterIdVideoIdGrouped, numPartitions); | |
JavaPairRDD<Integer, String> userIdStringRDD = clusterId_T_DmRatingVideoIds.mapToPair(new PairFunction<Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>>, Integer, String>() { | |
@Override | |
public Tuple2<Integer, String> call(Tuple2<Integer, Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>>> v1) throws Exception { | |
int clusterId = v1._1(); | |
Tuple2<DmRating, Iterable<Tuple2<Integer, Integer>>> tuple = v1._2(); | |
DmRating rating = tuple._1(); | |
Iterable<Tuple2<Integer, Integer>> videosCounts= tuple._2(); | |
StringBuilder recosStr = new StringBuilder(); | |
boolean appendComa = false; | |
for(Tuple2<Integer, Integer> videoCount : videosCounts){ | |
if(appendComa) recosStr.append(","); | |
recosStr.append("{"); | |
recosStr.append("\"video_id\":"); | |
recosStr.append(videoCount._1()); | |
recosStr.append(","); | |
recosStr.append("\"count\":"); | |
recosStr.append(videoCount._2()); | |
recosStr.append("}"); | |
appendComa = true; | |
} | |
String val = String.format("{\"user_id\":\"%s\",\"v1st\":\"%s\",\"redis_uid\":%s,\"cluster_id\": %d,\"recommendations\":[ %s ]}", rating.dmUserId, rating.dmV1stStr, rating.user(), clusterId, recosStr); | |
return new Tuple2<Integer, String>(rating.user(), val); | |
} | |
}); | |
JavaPairRDD<Integer, Iterable<String>> groupedRdd = userIdStringRDD.groupByKey(numPartitions); | |
JavaRDD<String> jsonStringRdd = groupedRdd.map(new Function<Tuple2<Integer, Iterable<String>>, String>() { | |
@Override | |
public String call(Tuple2<Integer, Iterable<String>> v1) throws Exception { | |
for(String str : v1._2()){ | |
return str; | |
} | |
LOGGER.error("Could not fetch a string from iterable so returning empty"); | |
return ""; | |
} | |
}); | |
//LOGGER.info("Number of items in RDD: {}", jsonStringRDD.count()); | |
//return jsonStringRDD.persist(StorageLevel.MEMORY_ONLY_SER_2()); | |
LOGGER.info("Repartitioning the data into {}", numPartitions ); | |
jsonStringRdd.cache().saveAsTextFile(outDir); | |
return jsonStringRdd; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment