Skip to content

Instantly share code, notes, and snippets.

@alexy
Created April 29, 2012 19:36
Show Gist options
  • Save alexy/2552884 to your computer and use it in GitHub Desktop.
Save alexy/2552884 to your computer and use it in GitHub Desktop.
joinLeft differs from joinRight, both are larger than expected, ++ causes NPE
object PairsNews {
type RankerPairs = DList[(Long, Iterable[(Long, Long))]
def main(a: Array[String]) = withHadoopArgs(a) { case args =>
case object o extends ScallopConf(args) {
val date = opt[String]("date", descr = "generate pairs for new regs after that date")
val allRankerPairsBase = opt[String]("base", descr = "directory where all ranker-pairs are stored, per day",
default = Some("fatpipe/generated"))
val globOld = opt[Boolean]("globOld", descr = "consider old as base for dates and read from all of them")
val addedRankerPairsFile = opt[String]("added", descr = "compliment of the old state to get the new state",required=true)
val newlyRankerPairsFile = opt[String]("newly", descr = "the pairs for the really newly registered rankers")
val newRankerPairsFile = trailArg[String](required = true)
// val totalRankerPairsFile = trailArg[String](required = true)
val oldRankerPairsFileArg = trailArg[String](required = false)
verify
def oldRankerPairsFile() = oldRankerPairsFileArg.get match {
case Some(s) => s
case _ => FileDay(newRankerPairsFile()).previous
}
// validate:
override def toString =
"reading old pairs from " + oldRankerPairsFile() +
"\n new pairs from " + newRankerPairsFile() +
// "\n writing overall combined pairs to " + totalRankerPairsFile() +
(addedRankerPairsFile.get match {
case Some(s) => "\n writing added ranker-pairs to " + s
case _ => ""}) +
(newlyRankerPairsFile.get match {
case Some(s) => "\n writing newly registered ranker-pairs to " + s
case _ => ""})
}
println("HelloKitty! Set-difference for Ranker-Pairs " + o)
println(" options set: " + o.summary)
val plusGlobMaybe = if (o.globOld()) "/*" else ""
type RankerPairSet = (KloutID, Set[Edge])
val oldRankerPairs =
if (o.globOld()) {
Pipeline.readRankerPairs(o.oldRankerPairsFile() + plusGlobMaybe)
.map { case (r, psIt) => (r, psIt.toSet) }
.groupByKey
.combine((_:Set[Edge])++(_:Set[Edge]))
.map{ case (r, ps) => (r, ps.toIterable) }
} else Pipeline.readRankerPairs(o.oldRankerPairsFile())
val newRankerPairs = Pipeline.readRankerPairs(o.newRankerPairsFile())
// val rpairsAdded: RankerPairs = joinLeft(newRankerPairs,oldRankerPairs).flatMap {
// case (r,newPairs,(Some(oldPairs))) =>
// val added = newPairs.toSet -- oldPairs // newPairs.diff(oldPairs.toSet)
// if (added.isEmpty) None
// else Some(r, added.toIterable)
// case (r,(newPairs,_)) => Some(r,newPairs)
// }
val rpairsAdded: RankerPairs = joinRight(oldRankerPairs,newRankerPairs).flatMap {
case (r,(Some(oldPairs),newPairs)) =>
val added = newPairs.toSet -- oldPairs // newPairs.diff(oldPairs.toSet)
if (added.isEmpty) None
else Some(r, added.toIterable)
case (r,(_,newPairs)) => Some(r,newPairs)
}
val rpairsNewly = joinLeft(newRankerPairs,oldRankerPairs) collect { case (r,(curr,None)) => (r,curr) }
// TODO: we need both new rankers and the old ones!
// oldRankerPairs,newRankerPairs should give the same result as
// oldRankerPairs,rpairsSince
// val rpairsUnion = joinLeft(oldRankerPairs,rpairsSince) map {
// case (r,(oldPairs,Some(newPairs))) =>
// val union = oldPairs.toSet ++ newPairs
// (r, union.toIterable)
// case (r,(oldPairs,_)) => (r,oldPairs)
// }
//
// val rpairsTotal = rpairsUnion ++ rpairsNewly
persist(
toTextFile(rpairsAdded map Pipeline.rankerPairsStr, o.addedRankerPairsFile())
// toTextFile(rpairsNewly map Pipeline.rankerPairsStr, o.newlyRankerPairsFile())
// toTextFile(rpairsTotal map Pipeline.rankerPairsStr, o.totalRankerPairsFile())
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment