Skip to content

Instantly share code, notes, and snippets.

@schmmd
Created June 12, 2013 16:19
Show Gist options
  • Save schmmd/5766839 to your computer and use it in GitHub Desktop.
Save schmmd/5766839 to your computer and use it in GitHub Desktop.
Scoobi multi-stage MR bug with lzo compression.
import AssemblyKeys._ // put this at the top of the file
assemblySettings
scalaVersion := "2.10.1"
libraryDependencies += "com.nicta" %% "scoobi" % "0.7.0-RC2-cdh3"
libraryDependencies += "org.apache.hadoop" % "hadoop-lzo" % "0.4.13"
resolvers ++= Seq("nicta" at "http://nicta.github.com/scoobi/releases",
"cloudera" at "https://repository.cloudera.com/content/repositories/releases",
"Sonatype snapshots" at "http://oss.sonatype.org/content/repositories/snapshots/")
mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
{
case x => {
val oldstrat = old(x)
if (oldstrat == MergeStrategy.deduplicate) MergeStrategy.first
else oldstrat
}
}
}
package com.nicta.scoobi.examples
import com.nicta.scoobi.Scoobi._
import com.nicta.scoobi.io.text._
import com.hadoop.mapreduce.LzoTextInputFormat
object Job extends ScoobiApp {
def run() {
val lines: DList[String] = TextInput.fromTextSource(
new TextSource(Seq(args(0)),
inputFormat = classOf[LzoTextInputFormat].asInstanceOf[Class[org.apache.hadoop.mapreduce.lib.input.TextInputFormat]]))
val mapped = lines.map { line =>
val Array(first, second) = line.split("\t")
(first, second)
}
val grouped = mapped.groupBy(_._1).map { case (key, values) =>
(key, values.size)
}
val grouped2 = grouped.groupBy(_._2).map { case (key, values) =>
(Iterable(key) ++ values).mkString("\t")
}
persist(grouped2.toTextFile(args(1), overwrite=true))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment