Created
June 19, 2018 15:16
-
-
Save drewnoff/477f23b27a8fde87175e29aef515a11d to your computer and use it in GitHub Desktop.
scala rdd example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class SiteId(val v: String) extends AnyVal { | |
override def toString = v.toString | |
} | |
case class ItemId(val v: String) extends AnyVal { | |
override def toString = v.toString | |
} | |
case class Item(siteId: SiteId, itemId: ItemId, price: Option[Double]) | |
def getFirstOccurrence(archive: RDD[(Timestamp, Item)], availableNow: RDD[(Item, Boolean)])(implicit parallel: Int) = { | |
val occurance = archive | |
.map { case (existAt, item) => ((item.siteId, item.itemId), existAt) } | |
.reduceByKey{ case (tsA, tsB) => if (tsA.toMillis < tsB.toMillis) tsA else tsB} | |
.join(availableNow.map{case (item, isAvailable) => ((item.siteId, item.itemId), isAvailable)}) | |
.filter{case ((siteId, itemId), (_, isAvailable)) => isAvailable} | |
.map{ case ((siteId, itemId), (createdAt, _)) => (siteId, itemId, createdAt)} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment