Skip to content

Instantly share code, notes, and snippets.

@drewnoff
Created June 19, 2018 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drewnoff/477f23b27a8fde87175e29aef515a11d to your computer and use it in GitHub Desktop.
Save drewnoff/477f23b27a8fde87175e29aef515a11d to your computer and use it in GitHub Desktop.
scala rdd example
case class SiteId(val v: String) extends AnyVal {
override def toString = v.toString
}
case class ItemId(val v: String) extends AnyVal {
override def toString = v.toString
}
case class Item(siteId: SiteId, itemId: ItemId, price: Option[Double])
def getFirstOccurrence(archive: RDD[(Timestamp, Item)], availableNow: RDD[(Item, Boolean)])(implicit parallel: Int) = {
val occurance = archive
.map { case (existAt, item) => ((item.siteId, item.itemId), existAt) }
.reduceByKey{ case (tsA, tsB) => if (tsA.toMillis < tsB.toMillis) tsA else tsB}
.join(availableNow.map{case (item, isAvailable) => ((item.siteId, item.itemId), isAvailable)})
.filter{case ((siteId, itemId), (_, isAvailable)) => isAvailable}
.map{ case ((siteId, itemId), (createdAt, _)) => (siteId, itemId, createdAt)}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment