Skip to content

Instantly share code, notes, and snippets.

@hantuzun
Last active August 29, 2015 14:23
Show Gist options
  • Save hantuzun/1403052bfeb57020ef21 to your computer and use it in GitHub Desktop.
Save hantuzun/1403052bfeb57020ef21 to your computer and use it in GitHub Desktop.
(require '[sparkling.conf :as conf])
(require '[sparkling.core :as sparkling])
(require '[sparkling.destructuring :as destructuring])
(import '[org.apache.spark.mllib.linalg.distributed RowMatrix])
(import '[org.apache.spark.mllib.linalg Vectors])
(import '[org.apache.spark HashPartitioner])
(def spark-config (-> (conf/spark-conf)
(conf/master "local")
(conf/app-name "item similarities")))
(def sc (sparkling/spark-context spark-config))
(def features (.objectFile sc (str "/tmp/models/account/space1/type1" "/pf")))
(def features-values
"feautes without ids"
(sparkling/map destructuring/value features))
(defn- transform-element [t]
"transforms an features-values row and its index to `#sparkling/tuple [column-index (row-index number)]`"
(let [row (destructuring/key t)
row-index (destructuring/value t)]
(map-indexed (fn [column-index number] (sparkling/tuple column-index (list row-index number))) (into [] row))))
(def indexed
"an RDD of `#sparkling/tuple [column-index (row-index number)]`"
(sparkling/flat-map-to-pair transform-element (.zipWithIndex features-values)))
(def sorted-by-column
"an RDD of `rank` elements like the following:
`#<IterableWrapper [(0 0.05773111432790756), (1 0.2511915862560272), ..., (1681 0.5747470259666443)]>`"
(sparkling/values (sparkling/sort-by-key (sparkling/group-by-key indexed))))
(defn- vector-string [v]
"converts lazy lists to a Vectors/parse compatible string representation"
(str "[" (apply str (interpose "," v)) "]"))
(defn- iterable-wrapper-to-vectors [iterable-wrapper]
"converts sorted-by-column elements to `Vectors` instances"
(Vectors/parse (vector-string (map second iterable-wrapper))))
(def vectors
"an RDD of `rank` elements like the following:
#<DenseVector [0.05773111432790756,0.2511915862560272, ..., 0.5747470259666443]>`"
(.rdd (sparkling/map iterable-wrapper-to-vectors sorted-by-column)))
(def transposed
"a row matrix almost like the transpose of features
has `rank` rows and features.count columns"
(RowMatrix. vectors))
(def threshold 0.1)
(def similarities
"a MapPartitionsRDD with almost `(features.count * (features.count - 1)) / 2` elements like the following:
#<MatrixEntry MatrixEntry(876,977,0.7676699720171842)>`"
(.toJavaRDD (.entries (.columnSimilarities transposed))))
(defn- matrix-entry-to-map [matrix-entry]
"maps matrix entries to clojure tuples like the following:
#<MatrixEntry MatrixEntry(876,977,0.7676699720171842)> -> [#sparkling/tuple [[876 977] 0.7676699720171842]"
(sparkling/tuple [(.i matrix-entry) (.j matrix-entry)] (.value matrix-entry)))
(def similarities-map
"a JavaPairRDD with almost `(features.count * (features.count - 1)) / 2` elements like the following:
[#sparkling/tuple [[876 977] 0.7676699720171842]`
(.lookup similarities-map [876 977]) gives a double list `(0.7676699720171842)` or an empty list `()`"
(sparkling/map-to-pair matrix-entry-to-map similarities))
(def similarities-map
"similarities-map is partitioned by a spesific partitioner in order to speed up lookups"
(let [num-partitions 100]
(.partitionBy similarities-map (HashPartitioner. num-partitions))))
; warm up the map
(.lookup similarities-map [0 0])
(def item-index-to-id
"maps features-values indices to their itemid hash ids
{1052601967 1252, ..., 955408773 1219}"
(into {}
(map-indexed (fn [idx item] {item idx}) (sparkling/collect (sparkling/map destructuring/key features)))))
(defn- similarity-helper [itemid-1 itemid-2]
"takes two hash item ids from uf and return a double list `(0.7676699720171842)` or an empty list `()`
- itemid-1 must be smaller than itemid-2
- there's no score in case itemid-1 is equal to itemid-2"
(.lookup similarities-map [(get item-index-to-id itemid-1) (get item-index-to-id itemid-2)]))
(defn similarity [itemid-1 itemid-2]
"takes two hash item ids from uf and return their double similarity score.
returns a double value."
(cond
(= itemid-1 itemid-2) 1.0
(< itemid-1 itemid-2) (or (first (similarity-helper itemid-1 itemid-2)) 0.0)
(> itemid-1 itemid-2) (or (first (similarity-helper itemid-2 itemid-1)) 0.0)))
(time (similarity 816964272 1705744326))
; "Elapsed time: 108.19738 msecs"
; 0.11980170255885798
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment