Skip to content

Instantly share code, notes, and snippets.

@alexy
Created April 9, 2012 23:58
Show Gist options
  • Save alexy/2347476 to your computer and use it in GitHub Desktop.
Save alexy/2347476 to your computer and use it in GitHub Desktop.
reusing Scoobi intermediates
val soiBy1: DList[Edge] =
if (o.fromKey()) {
val soiByK: DList[String] = convertKeyFromSequenceFile(o.soiByFile())
soiByK map { k => val c = k.split(","); (c(4).toLong, c(1).toLong) }
}
else fromDelimitedTextFile(o.soiByFile(),o.sep) {
case Long(u) :: Long(v) :: _ => (u,v)
}
val soiBy2: DList[Edge] = if (o.flip()) soiBy1 ++ soiBy1.map(_.swap) else soiBy1
val registeredSinceOpt = o.date.get map { case s =>
val y = s.substring(0,4)
val m = s.substring(4,6)
val d = s.substring(6,8)
val ymd = "%s-%s-%s".format(y,m,d)
new DateTime(ymd)
}
// val registeredSince = registeredSinceOpt.getOrElse(new DateTime("1970-01-01"))
val registeredAt: RegisteredAt = fromDelimitedTextFile(o.registeredFile(), ",") {
case Long(id) :: _ :: _ :: _ :: _ :: Extract.DateTime(createdAt) :: _
// if createdAt > registeredSince
=> (id, createdAt)
}
val scores: Scores = Pipeline.readScores((o.scoresFile()))
val rpairs: RankerPairs = Pipeline.rankerPairsFromRegisteredAt(
registeredAt, soiBy2, scores, o.pairGenParams)
Pipeline.saveRankerPairs(rpairs, o.newRankerPairsFile())
registeredSinceOpt match {
case Some(date)
if o.newlyRegisteredRankerPairsFile.get.isDefined ||
o.addedRankerPairsFile.get.isDefined =>
if (o.newlyRegisteredRankerPairsFile.get.isDefined) {
val rpairsAt = join(rpairs,registeredAt) collect {
case (r,(ps,at)) if at > date => (r,ps)
}
Pipeline.saveRankerPairs(rpairsAt, o.newlyRegisteredRankerPairsFile())
}
if (o.addedRankerPairsFile.get.isDefined) {
val oldRankerPairs = Pipeline.readRankerPairs(o.oldRankerPairsFile())
val rpairsSince: RankerPairs = joinLeft(rpairs,oldRankerPairs).map {
case (r,(newPairs,Some(oldPairs))) =>
(r, newPairs.toSet.diff(oldPairs.toSet).toIterable)
case (r,(newPairs,_)) => (r,newPairs)
}
Pipeline.saveRankerPairs(rpairsSince, o.addedRankerPairsFile())
}
case _ =>
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment