Last active
August 29, 2015 14:23
-
-
Save hantuzun/1403052bfeb57020ef21 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(require '[sparkling.conf :as conf]) | |
(require '[sparkling.core :as sparkling]) | |
(require '[sparkling.destructuring :as destructuring]) | |
(import '[org.apache.spark.mllib.linalg.distributed RowMatrix]) | |
(import '[org.apache.spark.mllib.linalg Vectors]) | |
(import '[org.apache.spark HashPartitioner]) | |
(def spark-config (-> (conf/spark-conf) | |
(conf/master "local") | |
(conf/app-name "item similarities"))) | |
(def sc (sparkling/spark-context spark-config)) | |
(def features (.objectFile sc (str "/tmp/models/account/space1/type1" "/pf"))) | |
(def features-values | |
"feautes without ids" | |
(sparkling/map destructuring/value features)) | |
(defn- transform-element [t] | |
"transforms an features-values row and its index to `#sparkling/tuple [column-index (row-index number)]`" | |
(let [row (destructuring/key t) | |
row-index (destructuring/value t)] | |
(map-indexed (fn [column-index number] (sparkling/tuple column-index (list row-index number))) (into [] row)))) | |
(def indexed | |
"an RDD of `#sparkling/tuple [column-index (row-index number)]`" | |
(sparkling/flat-map-to-pair transform-element (.zipWithIndex features-values))) | |
(def sorted-by-column | |
"an RDD of `rank` elements like the following: | |
`#<IterableWrapper [(0 0.05773111432790756), (1 0.2511915862560272), ..., (1681 0.5747470259666443)]>`" | |
(sparkling/values (sparkling/sort-by-key (sparkling/group-by-key indexed)))) | |
(defn- vector-string [v] | |
"converts lazy lists to a Vectors/parse compatible string representation" | |
(str "[" (apply str (interpose "," v)) "]")) | |
(defn- iterable-wrapper-to-vectors [iterable-wrapper] | |
"converts sorted-by-column elements to `Vectors` instances" | |
(Vectors/parse (vector-string (map second iterable-wrapper)))) | |
(def vectors | |
"an RDD of `rank` elements like the following: | |
#<DenseVector [0.05773111432790756,0.2511915862560272, ..., 0.5747470259666443]>`" | |
(.rdd (sparkling/map iterable-wrapper-to-vectors sorted-by-column))) | |
(def transposed | |
"a row matrix almost like the transpose of features | |
has `rank` rows and features.count columns" | |
(RowMatrix. vectors)) | |
(def threshold 0.1) | |
(def similarities | |
"a MapPartitionsRDD with almost `(features.count * (features.count - 1)) / 2` elements like the following: | |
#<MatrixEntry MatrixEntry(876,977,0.7676699720171842)>`" | |
(.toJavaRDD (.entries (.columnSimilarities transposed)))) | |
(defn- matrix-entry-to-map [matrix-entry] | |
"maps matrix entries to clojure tuples like the following: | |
#<MatrixEntry MatrixEntry(876,977,0.7676699720171842)> -> [#sparkling/tuple [[876 977] 0.7676699720171842]" | |
(sparkling/tuple [(.i matrix-entry) (.j matrix-entry)] (.value matrix-entry))) | |
(def similarities-map | |
"a JavaPairRDD with almost `(features.count * (features.count - 1)) / 2` elements like the following: | |
[#sparkling/tuple [[876 977] 0.7676699720171842]` | |
(.lookup similarities-map [876 977]) gives a double list `(0.7676699720171842)` or an empty list `()`" | |
(sparkling/map-to-pair matrix-entry-to-map similarities)) | |
(def similarities-map | |
"similarities-map is partitioned by a spesific partitioner in order to speed up lookups" | |
(let [num-partitions 100] | |
(.partitionBy similarities-map (HashPartitioner. num-partitions)))) | |
; warm up the map | |
(.lookup similarities-map [0 0]) | |
(def item-index-to-id | |
"maps features-values indices to their itemid hash ids | |
{1052601967 1252, ..., 955408773 1219}" | |
(into {} | |
(map-indexed (fn [idx item] {item idx}) (sparkling/collect (sparkling/map destructuring/key features))))) | |
(defn- similarity-helper [itemid-1 itemid-2] | |
"takes two hash item ids from uf and return a double list `(0.7676699720171842)` or an empty list `()` | |
- itemid-1 must be smaller than itemid-2 | |
- there's no score in case itemid-1 is equal to itemid-2" | |
(.lookup similarities-map [(get item-index-to-id itemid-1) (get item-index-to-id itemid-2)])) | |
(defn similarity [itemid-1 itemid-2] | |
"takes two hash item ids from uf and return their double similarity score. | |
returns a double value." | |
(cond | |
(= itemid-1 itemid-2) 1.0 | |
(< itemid-1 itemid-2) (or (first (similarity-helper itemid-1 itemid-2)) 0.0) | |
(> itemid-1 itemid-2) (or (first (similarity-helper itemid-2 itemid-1)) 0.0))) | |
(time (similarity 816964272 1705744326)) | |
; "Elapsed time: 108.19738 msecs" | |
; 0.11980170255885798 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment