Skip to content

Instantly share code, notes, and snippets.

@knub
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knub/0bfec859a563009c1d57 to your computer and use it in GitHub Desktop.
Save knub/0bfec859a563009c1d57 to your computer and use it in GitHub Desktop.
Transitive closure in Flink for redirect resolving.
import org.apache.flink.api.scala._
class RedirectResolvingProgram {
case class LinkWithOrig(from: String, origTo: String, to: String)
case class Redirect(from: String, to: String)
override def buildProgram(env: ExecutionEnvironment): Unit = {
val redirects = env.readTextFile(redirectPath).map { line =>
val split = line.split('\t')
Redirect(split(0), split(1))
}.name("Redirects")
val links = env.readTextFile(linksPath).map { line =>
val split = line.split('\t')
LinkWithOrig(split(0), split(1), split(1))
}.name("Links")
def iterate(s: DataSet[LinkWithOrig], ws: DataSet[LinkWithOrig]): (DataSet[LinkWithOrig], DataSet[LinkWithOrig]) = {
val resolvedRedirects = redirects.join(ws)
.where { _.from }
.equalTo { _.to }
.map { joinResult => joinResult match {
case (redirect, link) =>
link.copy(to = redirect.to)
}
}.name("Resolved-Redirects-From-Iteration")
(resolvedRedirects, resolvedRedirects)
}
// resolve redirects via delta iteration
val resolvedRedirects = links
.iterateDelta(links, 10, Array("from", "origTo"))(iterate)
.name("Resolved-Redirects")
.map { cl => (cl.from, cl.to) }
.name("Final-Redirect-Result")
resolvedRedirects.writeAsTsv(resolvedRedirectsPath)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment