Skip to content

Instantly share code, notes, and snippets.

@jerluc
Created July 27, 2012 19:21
Show Gist options
  • Save jerluc/3189968 to your computer and use it in GitHub Desktop.
Save jerluc/3189968 to your computer and use it in GitHub Desktop.
Brand Recommendation Prototype Using Scalding
import com.twitter.scalding._
import scala.util.matching.Regex
class BrandRecommandation(args : Args) extends Job(args) {
val brandData = Tsv(args("input"), ('userId, 'brandId, 'interactions))
val brandIncidence = brandData.groupBy('brandId) { _.size } rename { 'size -> 'brandIncidence }
val brandDataWithIncidence = brandData.joinWithSmaller('brandId -> 'brandId, brandIncidence)
val brandDataWithIncidence2 = brandDataWithIncidence.rename(('userId, 'brandId, 'interactions, 'brandIncidence) -> ('userId2, 'brandId2, 'interactions2, 'brandIncidence2))
val correlations =
brandDataWithIncidence
.joinWithSmaller('userId -> 'userId2, brandDataWithIncidence2)
.filter('brandId, 'brandId2) { brands: (Double, Double) => brands._1 < brands._2 }
.project(('brandId, 'interactions, 'brandIncidence, 'brandId2, 'interactions2, 'brandIncidence2))
.map(('interactions, 'interactions2) -> ('interactionsProd, 'interactionsSq, 'interactions2Sq)) {
interactions: (Int, Int) => (interactions._1 * interactions._2, math.pow(interactions._1, 2), math.pow(interactions._2, 2))
}
.groupBy('brandId, 'brandId2) { group =>
group.size
.sum('interactionsProd -> 'dotProduct)
.sum('interactions -> 'interactionsSum)
.sum('interactions2 -> 'interactions2Sum)
.sum('interactionsSq -> 'interactionsNormSq)
.sum('interactions2Sq -> 'interactions2NormSq)
.max('brandIncidence)
.max('brandIncidence2)
}
.map(('size, 'dotProduct, 'interactionsSum, 'interactions2Sum, 'interactionsNormSq, 'interactions2NormSq) -> 'correlation) {
fields: (Double, Double, Double, Double, Double, Double) => correlation(fields._1, fields._2, fields._3, fields._4, fields._5, fields._6)
}
def correlation(size: Double, dotProduct: Double, interactionsSum: Double, interactions2Sum: Double, interactionsNormSq: Double, interactions2NormSq: Double) = {
(size * dotProduct - interactionsSum * interactions2Sum) / math.sqrt(size * interactionsNormSq - interactionsSum * interactionsSum) * math.sqrt(size * interactions2NormSq - interactions2Sum * interactions2Sum)
}
correlations.write(Tsv(args("output")))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment