Skip to content

Instantly share code, notes, and snippets.

@Kornel
Created October 7, 2015 13:10
Show Gist options
  • Save Kornel/ba39411fdd6a27ddb33e to your computer and use it in GitHub Desktop.
Save Kornel/ba39411fdd6a27ddb33e to your computer and use it in GitHub Desktop.
package com.allegrogroup.reco.analyzer.spark.session
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
class SessionsSimilarity {
case class SessionEvent(sessionId: String, itemId: String)
def run(sc: SparkContext) {
val in = sc.textFile("/projects/reco_prod/data/sessions-cf/first")
val events = in.map(_.split(',')).map(cols => SessionEvent(sessionId = cols(0), itemId = cols(1)))
val bySession = events.groupBy(_.sessionId)
val pairs = bySession.values.map(_.map(_.itemId)).flatMap(_.toSet.subsets(2).map(set => set.head -> set.last)).map {
case (a, b) => if (a.compareTo(b) < 0) a -> b else b -> a
}
val itemsById = events.map(_.itemId).map(itemId => (itemId, 1)).reduceByKey(_ + _)
val sums = pairs.map(pair => (pair, 1)).reduceByKey(_ + _).map {
case ((itemA, itemB), count) => (itemA, (itemB, count))
}
val joinedA = sums.join(itemsById)
val joinedB = joinedA.map {
case (itemA, ((itemB, countAB), totalCountA)) => (itemB, (itemA, countAB, totalCountA))
}.join(itemsById).map {
case (itemB, ((itemA, countAB, totalCountA), totalCountB)) =>
(itemA, itemB, countAB.toDouble / (totalCountA + totalCountB - countAB))
}
???
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment