Skip to content

Instantly share code, notes, and snippets.

@frankscholten
Created September 12, 2014 13:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save frankscholten/d373c575ad721dd0204e to your computer and use it in GitHub Desktop.
Save frankscholten/d373c575ad721dd0204e to your computer and use it in GitHub Desktop.
Spark ItemSimilarity Java
package sparkexample;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import org.apache.mahout.drivers.DefaultElementReadSchema;
import org.apache.mahout.drivers.Schema;
import org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader;
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.MatrixSlice;
import org.apache.mahout.math.cf.SimilarityAnalysis;
import org.apache.mahout.math.drm.DistributedContext;
import org.apache.mahout.math.drm.DrmLike;
import org.apache.mahout.sparkbindings.SparkDistributedContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import scala.collection.JavaConverters;
import java.util.List;
/**
* Mahout Recommender which performs the following steps:
*
* 1. Configures Spark
* 2. Reads user / item data
* 3. Runs cooccurrence analysis
* 4. TODO: Writes the results into ElasticSearch
*/
public class MahoutRecommenderMain {
public static void main(String[] args) {
// Configure Spark
SparkConf config = new SparkConf();
config.setMaster("local");
config.setAppName("Mahout Recommender");
SparkContext sparkContext = new SparkContext(config);
DistributedContext distributedContext = new SparkDistributedContext(sparkContext);
// Read user / item data
Schema schema = new DefaultElementReadSchema();
TextDelimitedIndexedDatasetReader reader = new TextDelimitedIndexedDatasetReader(schema, distributedContext);
DrmLike<Object> drmA = reader.readElementsFrom("data/article_views.txt", HashBiMap.<String, Object>create()).matrix();
// Run cooccurrence analysis
scala.collection.immutable.List<DrmLike<Object>> cooccurrences = SimilarityAnalysis.cooccurrences(drmA, 0xdeadbeef, Integer.MAX_VALUE, Integer.MAX_VALUE, null);
List<DrmLike<Object>> indicatorMatrices = JavaConverters.asJavaListConverter(cooccurrences).asJava();
DrmLike<Object> indicatorMatrix = indicatorMatrices.get(0);
Matrix matrix = indicatorMatrix.checkpoint(null).collect();
// Print results
for (MatrixSlice matrixSlice : matrix) {
System.out.println(matrixSlice);
}
}
}
@pferrel
Copy link

pferrel commented Sep 12, 2014

converted to scala in https://gist.github.com/pferrel/9cfee8b5723bb2e2a22c

It uses the MahoutDriver and IndexedDataset and for a bonus uses Spark's parallel writing to part files, text of course.

@eshanhaval
Copy link

Please can you share or give a link where I can find the download link for the jars you have used in the code.

@amaebi
Copy link

amaebi commented Jan 6, 2015

Hi Frank, were you able to resolve the issue you were facing with this code regarding:

NotSerializableException: org.apache.mahout.math.DenseVector

I am also experiencing the same issue as well with both the java based and scala code:

java.io.NotSerializableException: org.apache.mahout.math.DenseVector
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Thanks

@pratikshya
Copy link

Hi,

I included math-scala jar as math jar was missing many of the functions required for this to work. When I run the program, I get java.lang.NoSuchMethodError: org.apache.mahout.drivers.TextDelimitedIndexedDatasetReader.readElementsFrom(Ljava/lang/String;Lcom/google/common/collect/BiMap;)Lorg/apache/mahout/sparkbindings/indexeddataset/IndexedDatasetSpark;

As it is trying to invoke Scala reader from Java, I'm getting this error.

Any thoughts, how would I be able to solve this? Is there any class which I can use in Java instead of Scala for similarity calculation?

@rohanar
Copy link

rohanar commented Apr 16, 2015

Can you please tell me what maven dependencies you need. I have trying the following:

  1. mahout-mr-0.10.0
    2.spark-core-2.11-1.3.0
    3.mahout-spark-2.10-0.10.0
    4.mahout-math-0.10.0
    still there are missing classes....

Copy link

ghost commented Nov 16, 2015

I am getting same error as @amaebi
NotSerializableException: org.apache.mahout.math.DenseVector

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment