Skip to content

Instantly share code, notes, and snippets.

@ramv-dailymotion
Created February 13, 2016 19:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ramv-dailymotion/38a32f379865e8ee5a58 to your computer and use it in GitHub Desktop.
Save ramv-dailymotion/38a32f379865e8ee5a58 to your computer and use it in GitHub Desktop.
Calling Mahout Spark SimilarityAnalysis from Java
public static String similarityAnalysis(JavaRDD<DmRating> ratingJavaRDD,
JavaSparkContext javaSparkContext,
String outdir,
String modelName)
throws Exception{
JavaRDD<Tuple2<String, String>> userIdVideoId = ratingJavaRDD.mapPartitions( ratingIterator -> {
ArrayList<Tuple2<String, String>> list = new ArrayList<>();
while(ratingIterator.hasNext()){
DmRating rating = ratingIterator.next();
String userId = rating.getUserId();
if(userId==null||userId.isEmpty()){
userId = rating.getV1stId();
}
list.add(new Tuple2<>(userId, Integer.toString(rating.product())));
}
return list;
});
ArraySeq<Tuple2<String, Object>> schemaArgs = new ArraySeq<>(4);
schemaArgs.update(0, new Tuple2<>("rowKeyDelim", "\t"));
schemaArgs.update(1, new Tuple2<>("columnIdStrengthDelim", ":"));
schemaArgs.update(2, new Tuple2<>("elementDelim", ","));
schemaArgs.update(3, new Tuple2<>("omitScore", false));
SparkDistributedContext sparkDistributedContext = new SparkDistributedContext(javaSparkContext.sc());
LOGGER.info("The number of observations: {}", userIdVideoId.count());
IndexedDataset dataset = IndexedDatasetSpark.apply(userIdVideoId.rdd(), scala.Option.empty(), sparkDistributedContext.sc());
IndexedDataset[] datasets = new IndexedDataset[1];
datasets[0] = dataset;
List<IndexedDataset> indicatorMatricies = SimilarityAnalysis.cooccurrencesIDSs(datasets, 0xdeadbeef, 50, 500);
scala.collection.Iterator<IndexedDataset> iter = indicatorMatricies.iterator();
while(iter.hasNext()){
IndexedDataset ids = iter.next();
ids.dfsWrite(outdir+"/similarity_indicatorMatrix", new Schema(schemaArgs.toSeq()), sparkDistributedContext);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment